拿到新書《.net框架設(shè)計》,到手之后迅速讀了好多,雖然這本書不像很多教程一樣從頭到尾系統(tǒng)的講明一些知識,但是從項目實戰(zhàn)角度告訴我們?nèi)绾问褂梦覀兊闹R,從這本書中提煉了一篇,正好符合我前幾篇的“數(shù)據(jù)驅(qū)動框架”設(shè)計的問題;
消息隊列
消息隊列 ( 英語 : Message queue)是一種 進程間通信 或同一進程的不同 線程 間的通信方式, 軟件 的 貯列 用來處理一系列的 輸入 ,通常是來自使用者。消息隊列提供了 異步 的 通信協(xié)議 ,每一個貯列中的紀錄包含詳細說明的資料,包含發(fā)生的時間,輸入裝置的種類,以及特定的輸入?yún)?shù),也就是說:消息的發(fā)送者和接收者不需要同時與消息隊列互交。消息會保存在 隊列 中,直到接收者取回它。
簡單的說隊列就是貯存了我們需要處理的Command但是并不是及時的拿到其處理結(jié)果;
實現(xiàn)
實際上,消息隊列常常保存在 鏈表 結(jié)構(gòu)中。擁有權(quán)限的進程可以向消息隊列中寫入或讀取消息。
目前,有很多消息隊列有很多開源的實現(xiàn),包括 JBoss Messaging 、 JORAM 、 Apache ActiveMQ 、 Sun Open Message Queue 、 Apache Qpid 和HTTPSQS。
優(yōu)點,缺點
消息隊列本身是 異步 的,它允許接收者在消息發(fā)送很長時間后再取回消息,這和大多數(shù)通信協(xié)議是不同的。例如 WWW 中使用的 HTTP 協(xié)議是 同步 的,因為客戶端在發(fā)出請求后必須等待服務(wù)器回應(yīng)。然而,很多情況下我們需要異步的通信協(xié)議。比如,一個進程通知另一個進程發(fā)生了一個事件,但不需要等待回應(yīng)。但消息隊列的異步特點,也造成了一個缺點,就是接收者必須 輪詢 消息隊列,才能收到最近的消息。
和 信號 相比,消息隊列能夠傳遞更多的信息。與 管道 相比,消息隊列提供了有格式的數(shù)據(jù),這可以減少開發(fā)人員的工作量。但消息隊列仍然有大小限制。
讀取隊列消息
主要有兩種(1)服務(wù)端的推;(2)客戶端的拉;
拉:主要是客戶端定時輪詢拿走消息處理;
推:通過事件訂閱方式主動通知訂閱者進行處理;
消息的貯存
簡單的是通過內(nèi)存鏈表實現(xiàn)貯存;也可以借助DB,比如Redis;還可以持久到本地文件中;
如何保證異步處理的一致性
盡管隊列主要目的是實現(xiàn)消息貯存,同時將調(diào)用與實現(xiàn)異步化。但是如果想達到處理消息一致性,好的方式是區(qū)別業(yè)務(wù)處理順序,比如操作主從DB,主負責寫,從負責讀,我們沒有機會在寫之后立馬從讀數(shù)據(jù)庫拿到你想要的結(jié)果;同時我們需要借助中間狀態(tài),當多個中間狀態(tài)同時符合調(diào)用結(jié)果才到到業(yè)務(wù)時間被處理,否則將“異常消息”持久化,待下次操作;
上代碼
建立消息對立核心隊列
{ public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message); public class MessageQueue:Queue<BaseMessage> { public static MessageQueue GlobalQueue = new MessageQueue(); private Timer timer = new Timer(); public MessageQueue() { this.timer.Interval = 5000; this.timer.Elapsed += Notify; this.timer.Enabled = true; } private void Notify(object sender, ElapsedEventArgs e) { lock (this) { if (this.Count > 0) { //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue()); var message = this.Dequeue(); this.messageNotifyEvent(message); } } } private MessageQueueEventNotifyHandler messageNotifyEvent; public event MessageQueueEventNotifyHandler MessageNotifyEvent { add { this.messageNotifyEvent += value; } remove { if (this.messageNotifyEvent != null) { this.messageNotifyEvent -= value; } } } } }
事件處理
public const string OrderCodePrefix = "P"; public void Submit(Message.BaseMessage message) { Order order = message.Body as Order; if (order.OrderCode.StartsWith(OrderCodePrefix)) { System.Console.WriteLine("這個是個正確的以({0})開頭的訂單:{1}", OrderCodePrefix,order.OrderCode); } else { System.Console.WriteLine("這個是個錯誤的訂單,沒有以({0})開頭:{1}",OrderCodePrefix,order.OrderCode); } }
可依據(jù)具體業(yè)務(wù)進行個性化處理;
通過Proxy向隊列追加消息
public class OrderServiceProxy:IOrderService { public void Submit(Message.BaseMessage message) { MessageQueue.MessageQueue.GlobalQueue.Enqueue(message); } }
客戶端調(diào)用
OrderService orderService = new OrderService(); MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit; var orders = new List<Order>() { new Order(){OrderCode="P001"}, new Order(){OrderCode="P002"}, new Order(){OrderCode="B003"} }; OrderServiceProxy proxy = new OrderServiceProxy(); orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order})); Console.ReadLine();
這樣就滿足了事件的綁定與觸發(fā)個性化處理,同時達到了消息異步化的目的,希望更細致的拓展用到后期的項目中。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
