python Rabbitmq編程(一)
?
?
實(shí)現(xiàn)最簡單的隊(duì)列通信
?
send端
#
!/usr/bin/env python
import
pika
credentials
= pika.PlainCredentials(
"用戶名
"
,
"密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
= connection.channel()
#
建立了rabbit協(xié)議的通道
#
聲明queue
channel.queue_declare(queue=
'
hello
'
)
#
n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=
''
,
routing_key
=
'
hello
'
,
body
=
'
Hello World!
'
)
print
(
"
[x] Sent 'Hello World!'
"
)
connection.close()
?
receive端
#
_*_coding:utf-8_*_
__author__
=
'
Alex Li
'
import
pika
credentials
= pika.PlainCredentials(
"用戶名
"
,
"密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
= connection.channel()
#
建立了rabbit協(xié)議的通道
#
You may ask why we declare the queue again ? we have already declared it in our previous code.
#
We could avoid that if we were sure that the queue already exists. For example if send.py program
#
was run before. But we're not yet sure which program to run first. In such cases it's a good
#
practice to repeat declaring the queue in both programs.
channel.queue_declare(queue=
'
hello
'
)
def
callback(ch, method, properties, body):
print
(
"
[x] Received %r
"
%
body)
#
callback函數(shù)當(dāng)拿到隊(duì)列里的值,則調(diào)用
channel.basic_consume(callback,
queue
=
'
hello
'
,
no_ack
=
True)
print
(
'
[*] Waiting for messages. To exit press CTRL+C
'
)
channel.start_consuming()
?
#注意:
遠(yuǎn)程連接rabbitmq server的話,需要配置權(quán)限。
#1.設(shè)置用戶與密碼
#
> rabbitmqctl add_user name pass
#
> rabbitmqctl set_user_tags name administrator
#2.設(shè)置權(quán)限,允許從外面訪問
#
rabbitmqctl set_permissions -p /name ".*" ".*" ".*"
set_permissions [-
p vhost] {user} {conf} {write} {read}
vhost
The name of the virtual host to which to grant the user access, defaulting to
/
.
user
The name of the user to grant access to the specified virtual host.
conf
A regular expression matching resource names
for
which the user
is
granted configure permissions.
write
A regular expression matching resource names
for
which the user
is
granted write permissions.
read
A regular expression matching resource names
for
which the user
is
granted read permissions.
#3.生產(chǎn)者與消費(fèi)者添加認(rèn)證信息
credentials = pika.PlainCredentials(
"用戶名
"
,
"密碼
"
)
?
#為什么要聲明兩次queue,這里hello為隊(duì)列名
# channel.queue_declare(queue='hello')
# 解決發(fā)起者先啟動(dòng),而接收者還沒有啟動(dòng),發(fā)送者先創(chuàng)建queue,
# 如果發(fā)起者已經(jīng)聲明了,接收者會(huì)檢測有沒有queue,如果有了,實(shí)際接收者是不會(huì)執(zhí)行聲明的,沒有就會(huì)聲明這個(gè)queue。
?
消息公平分發(fā)(循環(huán)調(diào)度)
在這種模式下,RabbitMQ會(huì)默認(rèn)把p發(fā)的消息依次分發(fā)給各個(gè)消費(fèi)者(c)。
輪巡公平的發(fā)送給接收者,比如第一次發(fā)送給第一個(gè)接收者,第二次發(fā)送給第二格接受者,如此。
send端
import
pika
import
time
credentials
= pika.PlainCredentials(
"
用戶名
"
,
"
密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
=
connection.channel()
#
聲明queue
channel.queue_declare(queue=
'
task_queue
'
)
#
n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
import
sys
message
=
'
'
.join(sys.argv[1:])
or
"
Hello World! %s
"
%
time.time()
channel.basic_publish(exchange
=
''
,
routing_key
=
'
task_queue
'
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
=2,
#
make message persistent
)
)
print
(
"
[x] Sent %r
"
%
message)
connection.close()
?
receive端
#
_*_coding:utf-8_*_
import
pika, time
credentials
= pika.PlainCredentials(
"
用戶名
"
,
"
密碼
"
)
connection
=
pika.BlockingConnection(pika.ConnectionParameters(
'
localhost
'
,credentials=
credentials))
channel
=
connection.channel()
def
callback(ch, method, properties, body):
print
(
"
[x] Received %r
"
%
body)
time.sleep(
20
)
print
(
"
[x] Done
"
)
print
(
"
method.delivery_tag
"
, method.delivery_tag)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'
task_queue
'
,
no_ack
=
True
)
print
(
'
[*] Waiting for messages. To exit press CTRL+C
'
)
channel.start_consuming()
?
消息確認(rèn)
執(zhí)行任務(wù)可能需要幾秒鐘。 你可能想知道如果其中一個(gè)消費(fèi)者開始一項(xiàng)長期任務(wù)并且只是部分完成而死亡會(huì)發(fā)生什么。 使用我們當(dāng)前的代碼,一旦RabbitMQ向消費(fèi)者傳遞消息,它立即將其標(biāo)記為刪除。 在這種情況下,如果你殺死一個(gè)工人,我們將丟失它剛剛處理的消息。 我們還將丟失分發(fā)給這個(gè)特定工作者但尚未處理的所有消息。
但我們不想失去任何任務(wù)。 如果工人死亡,我們希望將任務(wù)交付給另一名工人。
為了確保消息永不丟失,RabbitMQ支持? 消息 確認(rèn) 。 消費(fèi)者發(fā)回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。
如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送確認(rèn),RabbitMQ將理解消息未完全處理并將重新排隊(duì)。 如果同時(shí)有其他在線消費(fèi)者,則會(huì)迅速將其重新發(fā)送給其他消費(fèi)者。 這樣你就可以確保沒有消息丟失,即使工人偶爾會(huì)死亡。
沒有任何消息超時(shí);? 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新發(fā)送消息。 即使處理消息需要非常長的時(shí)間,也沒關(guān)系。
默認(rèn)情況下, 手動(dòng)消息確認(rèn) 已打開。 在前面的示例中,我們通過 auto_ack = True ?標(biāo)志 明確地將它們關(guān)閉 。 在我們完成任務(wù)后,是時(shí)候刪除此標(biāo)志并從工作人員發(fā)送適當(dāng)?shù)拇_認(rèn)。
def
callback(ch, method, properties, body):
print
"
[x] Received %r
"
%
(body,)
time.sleep( body.count(
'
.
'
) )
print
"
[x] Done
"
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_consume(callback,
queue
=
'
hello
'
)
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered
?
?
消息持久化
我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死亡,任務(wù)也不會(huì)丟失。 但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會(huì)丟失。
當(dāng)RabbitMQ退出或崩潰時(shí),它將忘記隊(duì)列和消息,除非你告訴它不要。 確保消息不會(huì)丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久。
首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)丟失我們的隊(duì)列。 為此,我們需要聲明它是 持久的 :
channel.queue_declare(queue=
'
hello
'
, durable=True)
雖然此命令本身是正確的,但它在我們的設(shè)置中不起作用。 那是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為 hello 的隊(duì)列 ?, 這個(gè)隊(duì)列 不耐用。 RabbitMQ不允許您使用不同的參數(shù)重新定義現(xiàn)有隊(duì)列,并將向嘗試執(zhí)行此操作的任何程序返回錯(cuò)誤。 但是有一個(gè)快速的解決方法 - 讓我們聲明一個(gè)具有不同名稱的隊(duì)列,例如 task_queue :
channel.queue_declare(queue=
'
task_queue
'
, durable=True)
此 queue_declare 更改需要應(yīng)用于生產(chǎn)者和消費(fèi)者代碼。
此時(shí)我們確信 即使RabbitMQ重新啟動(dòng) , task_queue 隊(duì)列也不會(huì)丟失。 現(xiàn)在我們需要將消息標(biāo)記為持久性 - 通過提供 值為 2 的 delivery_mode 屬性 。
channel.basic_publish(exchange=
''
,
routing_key
=
"
task_queue
"
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
= 2,
#
make message persistent
))
?
?
負(fù)載均衡
如果Rabbit只管按順序把消息發(fā)到各個(gè)消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個(gè)機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時(shí)配置高的消費(fèi)者卻一直很輕松。為解決此問題,可以在各個(gè)消費(fèi)者端,
配置perfetch_count=1,意思就是告訴RabbitMQ在我這個(gè)消費(fèi)者當(dāng)前消息還沒處理完的時(shí)候就不要再給我發(fā)新消息了。
send端
#
!/usr/bin/env python
import
pika
import
sys
connection
=
pika.BlockingConnection(
pika.ConnectionParameters(host
=
'
localhost
'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'
task_queue
'
, durable=
True)
message
=
'
'
.join(sys.argv[1:])
or
"
Hello World!
"
channel.basic_publish(
exchange
=
''
,
routing_key
=
'
task_queue
'
,
body
=
message,
properties
=
pika.BasicProperties(
delivery_mode
=2,
#
make message persistent
))
print
(
"
[x] Sent %r
"
%
message)
connection.close()
?
receive端
#
!/usr/bin/env python
import
pika
import
time
connection
=
pika.BlockingConnection(
pika.ConnectionParameters(host
=
'
localhost
'
))
channel
=
connection.channel()
channel.queue_declare(queue
=
'
task_queue
'
, durable=
True)
print
(
'
[*] Waiting for messages. To exit press CTRL+C
'
)
def
callback(ch, method, properties, body):
print
(
"
[x] Received %r
"
%
body)
time.sleep(body.count(b
'
.
'
))
print
(
"
[x] Done
"
)
ch.basic_ack(delivery_tag
=
method.delivery_tag)
channel.basic_qos(prefetch_count
=1
)
channel.basic_consume(queue
=
'
task_queue
'
, on_message_callback=
callback)
channel.start_consuming()
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

