Python 操作 Rabbit MQ 工作隊列 (四)
一、工作隊列簡介:
主要介紹,我們將會創建一個工作隊列,用于在多個工作人員之間分配耗時的任務。
工作隊列
:又稱為
任務隊列
,為了避免等待一些占用大量資源、時間的操作。當我們把任務當作
消息發送到隊列中
,一個運行在后臺的工作者進程就會取出任務,然后進行處理。當運行多個工作者,任務就會在它們之間共享。
二、準備工作:
首先,我們將發送一些字符串,把這些字符串當作復雜的任務,我們使用time.sleep()函數來模擬這種情況。我們在字符串中加上點號(.)來表示任務的復雜程度,一個點(.)就會耗時1秒鐘。比如"Hello…",就會耗時6秒種。
將之前的send.py文件,做簡單調整,以便可發送隨意的消息,到工作隊列中。名命為
new_task.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 創建一個實例 本地訪問
IP
地址可以為 localhost
后面5672是端口地址
(
可以不用指定
,
因為默認就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個管道
,
在管道里發送消息
channel
=
connection
.
channel
(
)
# 在管道里聲明隊列名稱
channel
.
queue_declare
(
queue
=
'hello'
)
# 參數exchange
=
''
表示默認交換
,
目前記住rabbitmq消息永遠不是直接發送到隊列中的
,
它需要通過交換
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'hello'
,
body
=
message
)
print
"[x] sent {}"
.
format
(
message
,
)
# 隊列關閉
connection
.
close
(
)
更改
receive.py
腳本,將消息體中每一個點號(.)模擬1秒種的操作,它會從任務隊列中獲取消息并執行。名命為
worker.py
:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
time
# 創建實例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 這里又聲明一次
'hello'
隊列
,
因為你不知道哪個程序
(
send
.
py
)
先運行
,
所以要聲明兩次
channel
.
queue_declare
(
queue
=
'hello'
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
time
.
sleep
(
body
.
count
(
'.'
)
)
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
# 告訴生產者
,
消息處理完成
# 消費消息
channel
.
basic_consume
(
queue
=
'hello'
,
# 從指定的消息隊列中接收消息
on_message_callback
=
callback
)
# 如果收到消息
,
就調用callback函數來處理
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費消息
三、循環調度:
使用工作隊列的一個好處就是它能夠
并行的處理隊列
,如果累積了很多任務,我們僅需要添加更多的工作者(workers)就可以了,擴展很簡單。
1.首先,先同時運行兩個
worker.py
腳本,它們會從隊列中獲取消息;
2.需要打開三個終端,兩個用來運行
worker.py
腳本,這倆個終端就是我們的兩個消費者(Consumers),C1和C2;
# shell
-
1
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
# shell
-
2
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
3.打開第三個終端,使用
new_task.py
進行發送任務:
# shell
-
3
python new_task
.
py python new_task
.
py First message
.
[
x
]
sent python new_task
.
py First message
.
python new_task
.
py python new_task
.
py Second message
.
.
[
x
]
sent python new_task
.
py Second message
.
.
python new_task
.
py python new_task
.
py Third message
...
[
x
]
sent python new_task
.
py Third message
...
python new_task
.
py python new_task
.
py Fourth message
...
.
[
x
]
sent python new_task
.
py Fourth message
...
.
python new_task
.
py python new_task
.
py Fifth message
...
.
.
[
x
]
sent python new_task
.
py Fifth message
...
.
.
4.查看工作者(workers)如果處理,生產者發送的消息:
# shell
-
1
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
[
X
]
Receivedpython new_task
.
py First message
.
[
X
]
Receivedpython new_task
.
py Third message
...
[
X
]
Receivedpython new_task
.
py Fifth message
...
.
.
# shell
-
2
(
lvhuiqi
)
[
root@iz2zeap40j01vg100ifsf4z lvhuiqi
]
# python worker
.
py
===
===
=
正在等待消息
===
===
==
[
X
]
Receivedpython new_task
.
py Second message
.
.
[
X
]
Receivedpython new_task
.
py Fourth message
...
總結 :
-
RabbitMQ 會按順序把消息發送給每個消費者(Consumer),平均每個消費者都會收到同等數量的消息。
這種發送消息的方式叫做輪詢
。
四、消息確認:
當處理一個比較耗時的任務時,如果消費者運行到一半就掛掉了,怎么辦?
當消息被RabbitMQ發送給消費者之后,馬上就會在內存中移除。這種情況,僅需要把一個工作者(worker)停止,正在處理的消息就會丟失。同時,所有發送到這個工作者的還沒有處理的消息都會丟失。
當然,我們不向丟失任何任務消息。
如果一個工作者(worker)掛掉了,我們希望任務會重新發送給其他的工作者。
解決辦法 :
-
為了防止消息跌勢,RabbitMQ提供了消息的響應。消息者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ就會釋放并刪除這條消息。
-
若消費者(Consunmer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然后重新發送給其他消費者。這樣,即使工作者偶爾掛掉,也不會丟失消息。
-
消息沒有超時這個概念的;當工作者與它斷開連接的時候,RabbitMQ會重新發送消息。這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。
-
消息響應,默認是開啟的。之前栗子中,使用
no_ack=True
標識把它關閉。當工作者完成了任務,就發送一個響應。def callback ( ch , method , properties , body ) : print '[X] Received{}' . format ( body , ) time . sleep ( body . count ( '.' ) ) ch . basic_ack ( delivery_tag = method . delivery_tag ) # 告訴生產者 , 消息處理完成 # 消費消息 channel . basic_consume ( queue = 'hello' , # 從指定的消息隊列中接收消息 on_message_callback = callback , # 如果收到消息 , 就調用callback函數來處理 on_ack = False ) # 開啟消息的響應 , 默認是開啟的
運行上面代碼,將兩個工作者進行殺掉,然后啟動生產者進行消息的發送。當工作者掛掉以后,所有沒有響應的消息都會重新發送。
# shell - 3 python new_task . py 01 Message . python new_task . py 02 Message . python new_task . py 03 Message . python new_task . py 04 Message . python new_task . py 05 Message .
# 查看當前隊列中的消息: rabbitmqctl list_queues Listing queues ... hello 5 # 隊列名字 'hello' 中 有 5 條信息沒有得到響應處理
# 重新啟動工作者: python worker . py === === = 正在等待消息 === === == [ X ] Received01 Message . [ X ] Received02 Message . [ X ] Received03 Message . [ X ] Received04 Message . [ X ] Received05 Message .
# 查看當前隊列中的消息 rabbitmqctl list_queues Listing queues ... hello 0 # 隊列名字 'hello' 所有消息已經釋放
注意
:很容易犯的錯誤,就是忘了
basic_ack
,后果很嚴重。消息在程序退出之后就會重新發送,如果它不能夠釋放沒有得到響應的消息,
RabbitMQ就會占用越來越多的內存
。
五、消息持久化:
如果沒有特意告訴RabbitMQ,那么在它退出或者崩潰時,
將會丟失所有隊列和消息
。為了保證不丟失,有兩個事情需要注意:
我們必須把隊列和消息設置為持久化
。
1.將隊列聲明持久化:
將隊列聲明為持久化(durable):
channel
.
queue_declare
(
queue
=
'hello'
,
durable
=
True
)
-
這行代碼本身是正確的,但是仍然不會正確運行。因為我們已經定義過一個叫’hello’的非持久化隊列,RabbitMQ不允許你使用不同的參數重新定義一個隊列,它會報錯。但我們現在使用一個快捷的解決方法,
用不同的隊列名
。channel . queue_declare ( queue = 'task_queue' , durable = True )
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的代碼中修改 。
2.將發送消息設為持久化:
這時,我們可以確保在RabbitMQ重啟之后,queue_declare隊列不會丟失。另外,我們需要把我們的消息也要設為持久化,將
delivery_mode的屬性設為2
。
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'task_queue'
,
body
=
message
,
properties
=
pika
.
BasicProperties
(
delivery_mode
=
2
)
# 使消息持久化
)
注意
:將消息設為持久化并不能完全保證不會丟失。以上代碼只是告訴
RabbitMQ要把消息存儲到硬盤中
,但從RabbitMQ收到消息到保持之間還是有一個很小的時間間隔。因為RabbitMQ并不是所有的消息都使用fsync,它有可能只是保持到緩存中,并不一定會寫到硬盤中。并不能
保證真正的持久化
。
六、公平調度:
以上 循環調度 的栗子,可以看出,它并不是按照我們所期望的那樣進行分發。如果有兩個工作者,處理奇數的消息比較繁忙,處理偶數的消息比較輕松。這時RabbitMQ并不知道這些,它依然會一如既往的派發消息。
這時因為RabbitMQ,只管分發進入隊列的消息,不會關心有多少消費者沒有作出響應。
使用
basic.qos方法,并設置prefetch_count=1
,告訴RabbitMQ,在同一時刻,不要發送超過1條消息給一個工作者,直到它已經處理上一條消息并且作出了響應。
這樣,RabbitMQ就會把消息分發給下一個空閑的工作者
。
channel
.
basic_qos
(
prefetch_count
=
1
)
注意 :如果所有工作者都處理繁忙的狀態,你的隊列就會被填滿。解決此問題,需要添加更多的工作者(workers),或者使用其他的策略。
七、整理本節最終代碼:
new_task.py
完整代碼:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
sys
message
=
' '
.
join
(
sys
.
argv
[
1
:
]
)
or
"Hello World!"
# 創建一個實例 本地訪問
IP
地址可以為 localhost
后面5672是端口地址
(
可以不用指定
,
因為默認就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個管道
,
在管道里發送消息
channel
=
connection
.
channel
(
)
# 在管道里聲明隊列名稱
channel
.
queue_declare
(
queue
=
'task_queue'
,
durable
=
True
)
# 參數exchange
=
''
表示默認交換
,
目前記住rabbitmq消息永遠不是直接發送到隊列中的
,
它需要通過交換
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'task_queue'
,
body
=
message
,
properties
=
pika
.
BasicProperties
(
delivery_mode
=
2
)
# 使消息持久化
)
print
"[x] sent {}"
.
format
(
message
,
)
# 隊列關閉
connection
.
close
(
)
worker.py
完整代碼:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
time
# 創建實例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 這里又聲明一次
'hello'
隊列
,
因為你不知道哪個程序
(
send
.
py
)
先運行
,
所以要聲明兩次
channel
.
queue_declare
(
queue
=
'task_queue'
,
durable
=
True
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
'[X] Received{}'
.
format
(
body
,
)
time
.
sleep
(
body
.
count
(
'.'
)
)
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
# 告訴生產者
,
消息處理完成
# 公平調度
,
同一時刻
,
不要發送超過
1
條消息給下一個工作者
channel
.
basic_qos
(
prefetch_count
=
1
)
# 消費消息
channel
.
basic_consume
(
queue
=
'task_queue'
,
# 從指定的消息隊列中接收消息
on_message_callback
=
callback # 如果收到消息
,
就調用callback函數來處理
)
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費消息
這些持久化的選項,可以使得在RabbitMQ重啟之后仍然能夠恢復。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
