欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)

系統(tǒng) 2100 0

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)

一、發(fā)布、訂閱:

我們將一個消息分發(fā)給 多個消費者 ,這種模式被稱為 發(fā)布/訂閱

為了更好的理解這個模式,我們將構建一個日志系統(tǒng),它包括兩個程序:

  • 第一個程序,負責發(fā)送日志消息;
  • 第二個程序,負責獲取消息并輸出內容;

在日志系統(tǒng)中,所有正在運行的接收方程序都會接收消息;

  • 一個接受者,把日志寫入硬盤中;
  • 另一個接受者,把日志輸出到屏幕上;

最終,日志消息被廣播給所有的接受者。

二、交換機(Exchanges):

概念 :應用程序發(fā)送消息時,先把消息給交換機,由交換機投遞給隊列,而不是直接給隊列。交換機可以由多個 消息通道(Channel) ,用于投遞消息。

簡單概括下之前的知識

  • 發(fā)布者(Producer):是發(fā)布消息的應用程序。
  • 隊列(Queue):用于消息存儲的緩沖。
  • 消費者(Consumer):是接收消息的應用程序。

圖解大體流程
Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)_第1張圖片

  • P:代表是發(fā)布者;
  • X:是交換機;

詳解圖意 :發(fā)布者(P )→交換機(X)→隊列(Q)→消費者(C );

  • 交換機一邊從發(fā)布者方接收消息,一邊把消息推送到隊列(Q)。 交換機必須知道如何處理它接收到的消息,是推送到指定的隊列、還是多個隊列,或者是忽略消息 。這些都是通過 交換機類型(Exchange Type) 來定義的。

交換機類型

1.直連交換機(Direct);

2.主題交換機(Topic);

3.頭交換機(Headers);

4.扇形交換機(Fanout);

  • 主要說明—扇形交換,它把消息發(fā)送給它所知道的所有隊列。

                    
                      channel
                      
                        .
                      
                      
                        exchange_declare
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        'fanout_logs'
                      
                      
                        ,
                      
                      
                             exchange_type
                      
                        =
                      
                      
                        'fanout'
                      
                      
                        )
                      
                    
                  

參數講解

  • exchange:就是交換機的名稱, 空字符串代表默認或者匿名交換機;

                    
                      channel
                      
                        .
                      
                      
                        basic_publish
                      
                      
                        (
                      
                      exchange
                      
                        =
                      
                      
                        ''
                      
                      
                        )
                      
                    
                  
  • exchange_type:就是交換機的類型;

  • routing_key:分發(fā)到指定的隊列;

  • body:發(fā)送的內容;

  • properties:使消息持久化;

查看交換器列表

命令: rabbitmqctl list_exchanges

            
              Listing exchanges 
              
                ...
              
              
amq
              
                .
              
              rabbitmq
              
                .
              
              log	topic
amq
              
                .
              
              direct	direct
amq
              
                .
              
              topic	topic
amq
              
                .
              
              headers	headers
	direct
amq
              
                .
              
              fanout	fanout
amq
              
                .
              
              rabbitmq
              
                .
              
              trace	topic
amq
              
                .
              
              match	headers

            
          

列表中以amq.*的開頭的交換器,都是默認創(chuàng)建的,目前不需要管它們。

三、臨時隊列:

我們連接上Rabbit MQ的時候,需要一個 全新的、空的隊列 (也就是說不使用之前提到的,routing_key參數指定的隊列名),我們可以 手動創(chuàng)建一個隨機的隊列名 ,或者讓 服務器為我們選擇一個隨機的隊列名(推薦) 。我們僅需要在 調用queue_declare方法時,不提供queue參數 即可:

            
              # 在管道里
              
                ,
              
               不聲明隊列名稱
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              
                )
              
            
          

可通過 result.method.queue 獲取已經生成的隨機隊列名,大概的樣子如下所示:

            
              amq
              
                .
              
              gen
              
                -
              
              DIAODS2sDSAKJKS
              
                ==
              
            
          

與消費者斷開連接時,這個隊列應被立即刪除:

            
              # 需要一個空的隊列  exclusive
              
                =
              
              True 表示與消費者斷開時
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              exclusive
              
                =
              
              True
              
                )
              
            
          

四、綁定:

img

目前已經創(chuàng)建一個扇形交換機和一個隊列。現(xiàn)在需要告訴交換機如果發(fā)送消息給隊列。

交換機和隊列之間的聯(lián)系我們稱為綁定(binding)

            
              # 將fanount_logs交換機將會把消息添加到我們的隊列中
              
                ,
              
               隊列名服務器隨機生成
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              result
              
                .
              
              method
              
                .
              
              queue
              
                )
              
            
          

查看綁定列表

列出所有現(xiàn)存的綁定命令: rabbitmqctl list_bindings

五、整理本節(jié)最終代碼:

圖解最終流程

Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)_第2張圖片

發(fā)布日志與之前的區(qū)別

1.我們把消息發(fā)送給fanout_logs交換機而不是匿名的交換機;

2.發(fā)送的時候需要提供routing_key參數,但它的值會被扇形交換機忽略;

以下是 send.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 創(chuàng)建一個實例  本地訪問
              
                IP
              
              地址可以為 localhost 
              
                后面5672是端口地址
              
              
                (
              
              可以不用指
# 定
              
                ,
              
               因為默認就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 聲明一個管道
              
                ,
              
               在管道里發(fā)送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機的類型為fanout
              
                ,
              
               執(zhí)行交換機名
              
                :
              
              fanout_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 投遞消息 exchange
              
                =
              
              
                'fancout_logs'
              
              交換機的名命
              
                ;
              
               type
              
                =
              
              
                'fanout'
              
              
                :
              
              扇形交換機
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              
                ''
              
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              message
              
                ,
              
              
                )
              
              
# 隊列關閉
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

若沒有綁定隊列的交換器,消息將會丟失。以下是 receive.py

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

# 創(chuàng)建實例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 聲明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機名為 fanout_logs 類型為扇形
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'fanout'
              
              
                )
              
              

# 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

# 綁定交換機和隊列
channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'fanout_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消費消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 從指定的消息隊列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就調用callback函數來處理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 開始消費消息

            
          

3.如果想把日志保存到文件中,打開控制臺輸入:

            
              python receive
              
                .
              
              py 
              
                >
              
               logs_from_rabbit
              
                .
              
              log 

            
          

4.在屏幕中查看日志,在打開一個新的終端運行:

            
              python receive
              
                .
              
              py 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
            
          

5.發(fā)送消息:

            
              python send
              
                .
              
              py 發(fā)送第一條消息

            
          

6.可以看到消費者接收到了消息,并且日志中也記錄了這條消息。

            
              cat logs_from_rabbit
              
                .
              
              log 

              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received發(fā)送第一條消息

            
          

7.確認已經創(chuàng)建的隊列綁定:

            
              rabbitmqctl list_bindings
Listing bindings 
              
                ...
              
              
	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
              
	exchange	hello	queue	hello	
              
                [
              
              
                ]
              
              
	exchange	task_queue	queue	task_queue	
              
                [
              
              
                ]
              
              
fanout_logs	exchange	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	queue	amq
              
                .
              
              gen
              
                -
              
              Di2rIkS1kQWcMODPxF5KuA	
              
                [
              
              
                ]
              
            
          

交換器fanout_logs把數據發(fā)送給兩個系統(tǒng)名命的隊列


更多文章、技術交流、商務合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 艹艹艹逼| 国产亚洲精品久久久久久久网站 | 91网页视频入口在线观看 | 97超级碰碰视频在线 | 亚洲精品乱码久久久久久9色 | 狠狠色婷婷丁香六月 | 波多野结衣一区二区 | 黄工厂精品视频在线观看 | 成人国产精品视频 | 国产婷婷精品av在线 | aaa在线 | 久久久午夜精品 | 久久久久在线观看 | 国产精品在线 | 国产性夜夜性夜夜爽91 | 欧美综合亚洲 | 久久久久久综合 | 成人毛片视频免费 | 色网在线播放 | 欧洲a老妇女黄大片 | 久久在线免费视频 | 国产精品欧美一区二区三区 | 亚洲精品97福利在线 | 波多野吉衣一区 | 蜜桃传媒一区二区亚洲AV | 久久精品无码一区二区日韩av | 亚洲一区二区三区91 | 中文精品视频 | 五月婷六月丁香狠狠躁狠狠爱 | 亚洲精品人人 | 无码免费人妻A片AAA毛片一区 | 国产玖玖 | 国产91一区二这在线播放 | 国产在线精品一区二区三区 | 成人午夜动漫在线观看 | 精品国产理论在线观看不卡 | 亚洲精品久久久蜜桃 | 午夜伦理影院 | 精品欧美乱码久久久久久 | 久久综合九色综合97欧美 | 欧美www视频 |