Python 操作 Rabbit MQ 發(fā)布/訂閱 (五)
一、發(fā)布、訂閱:
我們將一個消息分發(fā)給
多個消費者
,這種模式被稱為
發(fā)布/訂閱
。
為了更好的理解這個模式,我們將構建一個日志系統(tǒng),它包括兩個程序:
- 第一個程序,負責發(fā)送日志消息;
- 第二個程序,負責獲取消息并輸出內容;
在日志系統(tǒng)中,所有正在運行的接收方程序都會接收消息;
- 一個接受者,把日志寫入硬盤中;
- 另一個接受者,把日志輸出到屏幕上;
最終,日志消息被廣播給所有的接受者。
二、交換機(Exchanges):
概念
:應用程序發(fā)送消息時,先把消息給交換機,由交換機投遞給隊列,而不是直接給隊列。交換機可以由多個
消息通道(Channel)
,用于投遞消息。
簡單概括下之前的知識 :
- 發(fā)布者(Producer):是發(fā)布消息的應用程序。
- 隊列(Queue):用于消息存儲的緩沖。
- 消費者(Consumer):是接收消息的應用程序。
- P:代表是發(fā)布者;
- X:是交換機;
詳解圖意 :發(fā)布者(P )→交換機(X)→隊列(Q)→消費者(C );
-
交換機一邊從發(fā)布者方接收消息,一邊把消息推送到隊列(Q)。
交換機必須知道如何處理它接收到的消息,是推送到指定的隊列、還是多個隊列,或者是忽略消息
。這些都是通過交換機類型(Exchange Type)
來定義的。
交換機類型 :
1.直連交換機(Direct);
2.主題交換機(Topic);
3.頭交換機(Headers);
4.扇形交換機(Fanout);
-
主要說明—扇形交換,它把消息發(fā)送給它所知道的所有隊列。
channel . exchange_declare ( exchange = 'fanout_logs' , exchange_type = 'fanout' )
參數講解 :
-
exchange:就是交換機的名稱,
空字符串代表默認或者匿名交換機;
channel . basic_publish ( exchange = '' )
-
exchange_type:就是交換機的類型;
-
routing_key:分發(fā)到指定的隊列;
-
body:發(fā)送的內容;
-
properties:使消息持久化;
查看交換器列表 :
命令:
rabbitmqctl list_exchanges
Listing exchanges
...
amq
.
rabbitmq
.
log topic
amq
.
direct direct
amq
.
topic topic
amq
.
headers headers
direct
amq
.
fanout fanout
amq
.
rabbitmq
.
trace topic
amq
.
match headers
列表中以amq.*的開頭的交換器,都是默認創(chuàng)建的,目前不需要管它們。
三、臨時隊列:
我們連接上Rabbit MQ的時候,需要一個
全新的、空的隊列
(也就是說不使用之前提到的,routing_key參數指定的隊列名),我們可以
手動創(chuàng)建一個隨機的隊列名
,或者讓
服務器為我們選擇一個隨機的隊列名(推薦)
。我們僅需要在
調用queue_declare方法時,不提供queue參數
即可:
# 在管道里
,
不聲明隊列名稱
result
=
channel
.
queue_declare
(
)
可通過
result.method.queue
獲取已經生成的隨機隊列名,大概的樣子如下所示:
amq
.
gen
-
DIAODS2sDSAKJKS
==
與消費者斷開連接時,這個隊列應被立即刪除:
# 需要一個空的隊列 exclusive
=
True 表示與消費者斷開時
,
隊列立即刪除
result
=
channel
.
queue_declare
(
exclusive
=
True
)
四、綁定:
目前已經創(chuàng)建一個扇形交換機和一個隊列。現(xiàn)在需要告訴交換機如果發(fā)送消息給隊列。
交換機和隊列之間的聯(lián)系我們稱為綁定(binding)
# 將fanount_logs交換機將會把消息添加到我們的隊列中
,
隊列名服務器隨機生成
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
result
.
method
.
queue
)
查看綁定列表 :
列出所有現(xiàn)存的綁定命令:
rabbitmqctl list_bindings
五、整理本節(jié)最終代碼:
圖解最終流程 :
發(fā)布日志與之前的區(qū)別 :
1.我們把消息發(fā)送給fanout_logs交換機而不是匿名的交換機;
2.發(fā)送的時候需要提供routing_key參數,但它的值會被扇形交換機忽略;
以下是
send.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 創(chuàng)建一個實例 本地訪問
IP
地址可以為 localhost
后面5672是端口地址
(
可以不用指
# 定
,
因為默認就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個管道
,
在管道里發(fā)送消息
channel
=
connection
.
channel
(
)
# 指定交換機的類型為fanout
,
執(zhí)行交換機名
:
fanout_logs
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 投遞消息 exchange
=
'fancout_logs'
交換機的名命
;
type
=
'fanout'
:
扇形交換機
channel
.
basic_publish
(
exchange
=
'fanout_logs'
,
routing_key
=
''
,
body
=
message
)
print
"[x] sent {}"
.
format
(
message
,
)
# 隊列關閉
connection
.
close
(
)
若沒有綁定隊列的交換器,消息將會丟失。以下是
receive.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
# 創(chuàng)建實例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 指定交換機名為 fanout_logs 類型為扇形
channel
.
exchange_declare
(
exchange
=
'fanout_logs'
,
exchange_type
=
'fanout'
)
# 表示與消費者斷開連接
,
隊列立即刪除
result
=
channel
.
queue_declare
(
queue
=
''
,
exclusive
=
True
)
# 生成隊列的名字
queue_name
=
result
.
method
.
queue
# 綁定交換機和隊列
channel
.
queue_bind
(
exchange
=
'fanout_logs'
,
queue
=
queue_name
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
# 消費消息
channel
.
basic_consume
(
queue
=
queue_name
,
# 從指定的消息隊列中接收消息
on_message_callback
=
callback
,
# 如果收到消息
,
就調用callback函數來處理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費消息
3.如果想把日志保存到文件中,打開控制臺輸入:
python receive
.
py
>
logs_from_rabbit
.
log
4.在屏幕中查看日志,在打開一個新的終端運行:
python receive
.
py
===
===
=
正在等待消息
===
===
==
5.發(fā)送消息:
python send
.
py 發(fā)送第一條消息
6.可以看到消費者接收到了消息,并且日志中也記錄了這條消息。
cat logs_from_rabbit
.
log
===
===
=
正在等待消息
===
===
==
[
X
]
Received發(fā)送第一條消息
7.確認已經創(chuàng)建的隊列綁定:
rabbitmqctl list_bindings
Listing bindings
...
exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
exchange hello queue hello
[
]
exchange task_queue queue task_queue
[
]
fanout_logs exchange amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA queue amq
.
gen
-
Di2rIkS1kQWcMODPxF5KuA
[
]
交換器fanout_logs把數據發(fā)送給兩個系統(tǒng)名命的隊列
更多文章、技術交流、商務合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
