欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

Python 操作 Rabbit MQ 路由 (六)

系統 1771 0

Python 操作 Rabbit MQ 路由 (六)

一、路由(Routing):

本章打算新增加一個功能,使它可以達到僅訂閱消息的一個子集。

舉個栗子,我們需要把驗證的錯誤日志信息寫入日志文件(存儲到磁盤),但同時仍然把所有的日志信息輸出到控制臺中。

二、綁定(Bindings):

綁定(Binding)是指交換機(Exchange)和隊列(Queue)的關系 ;

綁定的時候可以帶上一個額外的routing_key參數。為了避免與basic_publish的參數混淆,我們把它叫做 綁定鍵(Binding Key)

            
              channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              exchange_name
              
                ,
              
              
                   queue
              
                =
              
              queue_name
              
                ,
              
              
                   routing_key
              
                =
              
              
                'fe_cow'
              
              
                )
              
            
          

注意

  • 綁定鍵的意義取決于交換機的類型,上一篇使用的 扇形交換機會忽略這個值

三、直連交換機(Direct Exchange):

打算擴展上一篇的功能,使其基于日志的嚴重程度進行消息過濾。比如把一些比較嚴重的錯誤日志寫入磁盤,以免將警告或信息日志上浪費磁盤空間,僅記錄比較嚴重的錯誤。

上一篇我們使用的扇形交換機,沒有足夠的靈活性,僅能做廣播的需求;

直連交換機的路由算法很簡單易懂, 交換機將會對綁定鍵和路由鍵進行精準匹配,從而確定消息該分發到哪個隊列

圖解

Python 操作 Rabbit MQ 路由 (六)_第1張圖片

詳解 :可以看到type=direct,指定交換機的類型為直連交換機,它和兩個隊列都進行了綁定。第一個隊列(Q1)使用orange作為綁定鍵,第二個隊列(Q2)有兩個綁定鍵,一個是black作為綁定鍵,另一個是green綁定鍵。

簡單理解 :當路由鍵為orange的消息發布到交換機,就會被路由到隊列Q1中,路由鍵為black或green的消息發布到交換機,就會被路由到隊列Q2中,其他的所有消息都將被丟棄。

四、多個綁定:

圖解

Python 操作 Rabbit MQ 路由 (六)_第2張圖片

詳解 :多個隊列使用相同的綁定鍵是合法的。這樣一來,直連交換機就和扇形交換機的行為一樣,會將消息廣播到所有匹配的隊列(Q1和Q2)。

五、發送日志:

將發送消息到一個直連的交換機,把日志級別作為路由鍵,這樣接收日志的腳本可根據嚴重的級別來選擇它想要處理的日志,我們假設 severity的值是info、warning、error中的一個

創建直連交換機:

            
              # 交換機名為
              
                :
              
              direct_logs  類型為
              
                :
              
              直連交換機
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               type
              
                =
              
              
                'direct'
              
              
                )
              
            
          

發送消息:

            
              # 交換機名稱為
              
                :
              
              direct_logs
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               routing_key
              
                =
              
              severity
              
                ,
              
               body
              
                =
              
              message
              
                )
              
            
          

六、訂閱:

處理接收消息的方式跟之前不一樣,將會為每個嚴重級別分別創建一個新的綁定。

            
              # 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue


              
                for
              
               severity 
              
                in
              
               severities
              
                :
              
              
    channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
              
                       queue
              
                =
              
              queue_name
              
                ,
              
              
                       routing_key
              
                =
              
              severity
              
                )
              
            
          

七、整理本節代碼:

圖解
Python 操作 Rabbit MQ 路由 (六)_第3張圖片
1.以下是 send.py 代碼:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

severity 
              
                =
              
               sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                ]
              
              
                if
              
              
                len
              
              
                (
              
              sys
              
                .
              
              argv
              
                )
              
              
                >
              
              
                1
              
              
                else
              
              
                'info'
              
              
message 
              
                =
              
              
                ' '
              
              
                .
              
              
                join
              
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                2
              
              
                :
              
              
                ]
              
              
                )
              
               or 
              
                "Hello World!"
              
              

# 創建一個實例  本地訪問
              
                IP
              
              地址可以為 localhost 
              
                后面5672是端口地址
              
              
                (
              
                >
              
              以不用指定
              
                ,
              
               因為默認就是
              
                5672
              
              
                )
              
              
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                ,
              
              
                5672
              
              
                )
              
              
                )
              
              

# 聲明一個管道
              
                ,
              
               在管道里發送消息
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機的類型為direct
              
                :
              
               執行交換機    交換機名稱
              
                :
              
               direct_logs
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'direct'
              
              
                )
              
              

# 投遞消息
channel
              
                .
              
              
                basic_publish
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
              
                      routing_key
              
                =
              
              severity
              
                ,
              
              
                      body
              
                =
              
              message
                      
              
                )
              
              

print 
              
                "[x] sent {}"
              
              
                .
              
              
                format
              
              
                (
              
              severity
              
                ,
              
               message
              
                )
              
              
# 隊列關閉
connection
              
                .
              
              
                close
              
              
                (
              
              
                )
              
            
          

2.以下是 receive.py 代碼:

            
              #
              
                !
              
              
                /
              
              usr
              
                /
              
              bin
              
                /
              
              python
# 
              
                -
              
              
                *
              
              
                -
              
               coding
              
                :
              
               utf
              
                -
              
              
                8
              
              
                -
              
              
                *
              
              
                -
              
              
                import
              
               pika

              
                import
              
               sys

# 創建實例
connection 
              
                =
              
               pika
              
                .
              
              
                BlockingConnection
              
              
                (
              
              pika
              
                .
              
              
                ConnectionParameters
              
              
                (
              
              
                'localhost'
              
              
                )
              
              
                )
              
              

# 聲明管道
channel 
              
                =
              
               connection
              
                .
              
              
                channel
              
              
                (
              
              
                )
              
              

# 指定交換機名為 direct_logs 交換機類型為
              
                :
              
              direct
channel
              
                .
              
              
                exchange_declare
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               exchange_type
              
                =
              
              
                'direct'
              
              
                )
              
              

# 表示與消費者斷開連接
              
                ,
              
               隊列立即刪除
result 
              
                =
              
               channel
              
                .
              
              
                queue_declare
              
              
                (
              
              queue
              
                =
              
              
                ''
              
              
                ,
              
               exclusive
              
                =
              
              True
              
                )
              
              

# 生成隊列的名字
queue_name 
              
                =
              
               result
              
                .
              
              method
              
                .
              
              queue

severities 
              
                =
              
               sys
              
                .
              
              argv
              
                [
              
              
                1
              
              
                :
              
              
                ]
              
              
                if
              
               not severities
              
                :
              
              
    print 
              
                >>
              
               sys
              
                .
              
              stderr
              
                ,
              
              
                "Usage: %s [info] [warning] [error]"
              
              
                %
              
               \
                         
              
                (
              
              sys
              
                .
              
              argv
              
                [
              
              
                0
              
              
                ]
              
              
                ,
              
              
                )
              
              
    sys
              
                .
              
              
                exit
              
              
                (
              
              
                1
              
              
                )
              
              
                for
              
               severitie 
              
                in
              
               severities
              
                :
              
              
    # 綁定交換機和隊列  這里注意的是綁定鍵
              
                ,
              
               就是根據按照指定嚴重級別進行記錄日志
    channel
              
                .
              
              
                queue_bind
              
              
                (
              
              exchange
              
                =
              
              
                'direct_logs'
              
              
                ,
              
               queue
              
                =
              
              queue_name
              
                ,
              
               routing_key
              
                =
              
              severitie
              
                )
              
              


def 
              
                callback
              
              
                (
              
              ch
              
                ,
              
               method
              
                ,
              
               properties
              
                ,
              
               body
              
                )
              
              
                :
              
              
    print 
              
                '[X] Received{}'
              
              
                .
              
              
                format
              
              
                (
              
              body
              
                ,
              
              
                )
              
              


# 消費消息
channel
              
                .
              
              
                basic_consume
              
              
                (
              
              queue
              
                =
              
              queue_name
              
                ,
              
                # 從指定的消息隊列中接收消息
                      on_message_callback
              
                =
              
              callback
              
                ,
              
                # 如果收到消息
              
                ,
              
               就調用callback函數來處理
                      
              
                )
              
              
                print
              
              
                (
              
              
                '=======正在等待消息========'
              
              
                )
              
              
channel
              
                .
              
              
                start_consuming
              
              
                (
              
              
                )
              
                # 開始消費消息

            
          

3.僅希望保存 warning和error級別 日志到磁盤中,需要打開控制臺并輸入:

            
              python receive
              
                .
              
              py warning error 
              
                >
              
               logs_from_rabbit
              
                .
              
              log

            
          

4.希望所有日志都輸出到屏幕中,打開一個新的終端,輸入:

            
              python receive
              
                .
              
              py info warning error

            
          

5.要觸發一個 error 級別的日志,需要輸入:

            
              python send
              
                .
              
              py error 
              
                '發送一個error級別的錯誤'
              
            
          
            
              # 可以看到步驟
              
                3
              
              的控制臺
              
                ,
              
               會出現
              
                :
              
              
                ===
              
              
                ===
              
              
                =
              
              正在等待消息
              
                ===
              
              
                ===
              
              
                ==
              
              
                [
              
              
                X
              
              
                ]
              
               Received發送一個error級別的錯誤

            
          

簡單理解:通過綁定鍵的名稱,來進行由哪個隊列進行處理


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 91观看| 九九九九九九精品任你躁 | 91精品观看 | 婷婷草| 国产综合在线播放 | 上将的炮灰前妻重生了 | 成人网在线免费观看 | 青青青青久久久久国产的 | 五月婷婷综合网 | 114美女做爰视频在线 | 日韩av片免费播放 | 久久www免费人成看片高清 | 麻豆精品在线观看 | 草草线在成人免费视频 | 日本成片| 亚洲天天干 | 一级做a | 国产精彩视频在线 | 91亚洲在线 | 国内精品玖玖玖玖电影院 | 国产精品一区二555 欧美在线免费 | 国产成人一区二区三区 | 国产综合视频在线 | 91青青操| 天天夜夜人人 | 免费精品久久久久久中文字幕 | 欧美日本一道高清二区三区 | 色男人的天堂 | 亚洲精品色 | 天天艹天天干天天 | 狠狠综合久久av一区二区小说 | 五月婷婷丁香综合网 | 亚洲网站在线观看 | 欧美国产日本高清不卡 | 中国美女一级黄色片 | 99国产精品 | 日韩av电影在线免费观看 | 日本捏胸摸下面免费视频 | 日韩精品一区二区三区 | 国产精品美女久久久 | 亚洲国产精品久久久久666 |