Python 操作 Rabbit MQ 基礎(chǔ):
一、簡介:
1.介紹:
RabbitMQ是一個(gè)消息代理:它接收和轉(zhuǎn)發(fā)消息。
可以把它比作為郵局,當(dāng)您要發(fā)布郵件放在郵箱中時(shí),可以確定這封郵件讓哪位快遞員來進(jìn)行發(fā)送到您的收件人手中。
2.術(shù)語:
1.發(fā)送消息的程序是
生產(chǎn)者
:
2.隊(duì)列可以理解為郵箱,用來存儲一些郵件。隊(duì)列的由主機(jī)的
存儲器
和
磁盤限制約束
,它本質(zhì)上是一個(gè)大的
消息緩沖器
。很多生產(chǎn)者可以發(fā)送到一個(gè)隊(duì)列的消息,并且許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù):
3.消費(fèi)者可以理解為接收人。一個(gè)消費(fèi)者是一個(gè)程序,
主要是等待接收信息
:
注意 :
-
生產(chǎn)者、消費(fèi)者、消息隊(duì)列
不必駐留在同一個(gè)主機(jī)上
。
二、發(fā)送"Hello world":
實(shí)現(xiàn)功能:
生產(chǎn)者 → 消息隊(duì)列 → 消費(fèi)者
RabbitMQ有許多不同語言的客戶端,我們使用的Python客戶端,那么使用
Pika 1.0.0
包:
安裝命令:
python -m pip install pika --upgrade
三、生產(chǎn)者:
新創(chuàng)建一個(gè)程序
send.py
,將向隊(duì)列發(fā)送一條消息:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
# 創(chuàng)建一個(gè)實(shí)例 本地訪問
IP
地址可以為 localhost
后面5672是端口地址
(
可以不用指定
,
因?yàn)槟J(rèn)就是
5672
)
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
,
5672
)
)
# 聲明一個(gè)管道
,
在管道里發(fā)送消息
channel
=
connection
.
channel
(
)
# 在管道里聲明隊(duì)列名稱
channel
.
queue_declare
(
queue
=
'hello'
)
# 參數(shù)exchange
=
''
表示默認(rèn)交換
,
目前記住rabbitmq消息永遠(yuǎn)不是直接發(fā)送到隊(duì)列中的
,
它需要通過交換
channel
.
basic_publish
(
exchange
=
''
,
routing_key
=
'hello'
,
body
=
'hello world'
)
# 隊(duì)列關(guān)閉
connection
.
close
(
)
注意 :
- 若沒有看到’已發(fā)送’消息,有可能是消息隊(duì)列沒有足夠的磁盤空間可用。默認(rèn)情況下它至少需要200MB空間,因此會拒絕接收消息。
四、消費(fèi)者:
新創(chuàng)建第二個(gè)程序
receive.py
,將從隊(duì)列中接收消息并打印出來:
#
!
/
usr
/
bin
/
python
#
-
*
-
coding
:
utf
-
8
-
*
-
import
pika
import
time
# 創(chuàng)建實(shí)例
connection
=
pika
.
BlockingConnection
(
pika
.
ConnectionParameters
(
'localhost'
)
)
# 聲明管道
channel
=
connection
.
channel
(
)
# 這里又聲明一次
'hello'
隊(duì)列
,
因?yàn)槟悴恢滥膫€(gè)程序
(
send
.
py
)
先運(yùn)行
,
所以要聲明兩次
channel
.
queue_declare
(
queue
=
'hello'
)
def
callback
(
ch
,
method
,
properties
,
body
)
:
print
(
'ch:'
,
ch
)
print
(
'method:'
,
method
)
print
(
'properties:'
,
properties
)
print
(
'--------收到消息:---------{}'
.
format
(
body
)
)
# 看消息是否已經(jīng)接收
time
.
sleep
(
20
)
# 睡
20
秒
ch
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
# 告訴生產(chǎn)者
,
消息處理完成
# 消費(fèi)消息
channel
.
basic_consume
(
queue
=
'hello'
,
# 從指定的消息隊(duì)列中接收消息
on_message_callback
=
callback
)
# 如果收到消息
,
就調(diào)用callback函數(shù)來處理
print
(
'=======正在等待消息========'
)
channel
.
start_consuming
(
)
# 開始消費(fèi)消息
列出目前有的隊(duì)列:
sudo rabbitmqctl list_queues
五、運(yùn)行程序:
1.首先在終端上使用我們的程序,先讓
消費(fèi)者的程序運(yùn)行起來
,等待生產(chǎn)者的發(fā)送:
python receive
.
py
#
===
===
=
正在等待消息
===
===
==
2.現(xiàn)在開始讓生產(chǎn)者,發(fā)送消息:
python send
.
py
#
--
--
--
--
--
--
--
--
--
--
--
發(fā)出消息
--
--
--
--
--
--
--
--
--
--
-
3.可以看到消費(fèi)者的打印信息:
(
'ch:'
,
<
BlockingChannel impl
=
<
Channel number
=
1
OPEN
conn
=
<
SelectConnection
OPEN
transport
=
<
pika
.
adapters
.
utils
.
io_services_utils
.
_AsyncPlaintextTransport object at
0x153b590
>
params
=
<
ConnectionParameters host
=
localhost port
=
5672
virtual_host
=
/
ssl
=
False
>>>
>
)
(
'method:'
,
<
Basic
.
Deliver
(
[
'consumer_tag=ctag1.cc4070930568408fb20c783f85f9336e'
,
'delivery_tag=1'
,
'exchange='
,
'redelivered=False'
,
'routing_key=hello'
]
)
>
)
(
'properties:'
,
<
BasicProperties
>
)
--
--
--
--
收到消息:
--
--
--
--
-
hello world
4.可以查看一下目前有的隊(duì)列:
rabbitmqctl list_queues
// 執(zhí)行命令
Listing queues
...
hello
0
// hello 隊(duì)列的名稱 0:消息數(shù)量
5.我們已經(jīng)成功的通過RabbitMQ發(fā)送第一條消息,hello world,但是
receive.py
程序不會退出,它將保持準(zhǔn)備接收更多消息,使用
Ctrl +c 中斷
。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
