pika生產者程序大致步驟:
1. 建立連接connection , 需要認證的調用認證參數
2. 創建通道channel 當然 channel可以池化,這樣可以重復使用
3. 聲明隊列 指定隊列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱
4. 聲明交換機 交換機類型,名稱等, 也可以不用聲明,直接使用 “” 空字符串,默認交換機也可以
5. 將隊列與交換機綁定 queue_bind
6. basic_publish 發送到交換機 指定路由鍵
pika消費者程序大致步驟:
1. 建立連接connection , 需要認證的調用認證參數
2. 創建通道channel 當然 channel可以池化,這樣可以重復使用
3. 聲明隊列 指定隊列屬性, 一旦指定屬性不能修改, 例如是否持久化,名稱
4. 聲明交換機 交換機類型,名稱等, 也可以不用聲明,直接使用 “” 空字符串,默認交換機也可以
5. 將隊列與交換機綁定 queue_bind
6. basic_consume 消費消息
1. 輪詢接收消息
使用消息隊列的一個好處就是, 可以將任務消息發送到隊列中,由消費者異步進行處理, 同時對于后端消費者可以很容易地增加減少,只需要運行多個進程即可, 方便擴展, 之前的示例中消費端程序就可以開啟多個,然后可以看到消費被輪詢得分配給每個消費者
將之前的消費者略作更改, 加入客戶端編號,啟動三個消費者, 通過生產者發送4個消息, 依次收到消息, 即是 輪詢(round-robin):
[*] Waiting for messages. To exit press CTRL+C
1. [x] Received 'Hello World!'
1. [x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C
2. [x] Received 'Hello World!'
[*] Waiting for messages. To exit press CTRL+C
3. [x] Received 'Hello World!'
2. 消息確認:
為了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ就會釋放并刪除這條消息。 如果消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然后重新發送給其他消費者(consumer)。這樣,及時工作者(workers)偶爾的掛掉,也不會丟失消息。
消息響應默認auto_ack=False, 不自動確認消息, 即是需要我們處理并確認消息的
確認需要發送確認消息:
在回調callback中加入basic_ack
channel
.
basic_ack
(
delivery_tag
=
method
.
delivery_tag
)
如果auto_ack設置為True,而忘記basic_ack消息確認,消息在程序退出之后就會重新發送,如果不及時釋放沒響應的消息,RabbitMQ就會占用越來越多的內存。
為了排除這種錯誤,可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:
# rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
hello 0 0
TEST01 0 0
3. 消息持久化
如果沒有向rabbitmq指定消息持久化, 則退出或者崩潰的時候,將會丟失所有隊列和消息, 消息持久化必須把“隊列”和“消息”設為持久化
- 隊列聲明持久化:
channel.queue_declare(queue='hello', durable=True)
注意一個消息隊列被聲明過一次后,rabbitmq不允許使用不同的參數重新定義隊列, 因此如果存在hello隊列,上面會提示錯誤
- 消息聲明持久化
將publish生產者發送消息時候消息屬性, delivery_mode的屬性設為2
properties=pika.BasicProperties(delivery_mode=2)
生產者代碼:
channel.queue_declare(queue='TEST02', durable=True)
channel.basic_publish(exchange='',
routing_key='TEST02',
body='Hello World!',
properties=pika.BasicProperties(delivery_mode=2)
)
客戶端代碼:
channel.queue_declare(queue='TEST02', durable=True)
def callback(ch, method, properties, body):
print(". [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue='TEST02',
auto_ack=False,
)
4. 設置客戶端QOS
開啟客戶端最大的未處理消息隊列大小:
channel.basic_qos(prefetch_count=1)
代碼示例:
channel.queue_declare(queue='TEST02', durable=True)
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print(". [x] Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue='TEST02',
auto_ack=False,
)
5. 發布訂閱模式:
rabbitmq在之前介紹的時候可以看到,消息是被依次發送給消費者,即是消息只會被發送給一個消費者,除非開啟確認機制時處理失敗了, 一個消息發送給多個消費者, 這個是rabbitmq提供的發布訂閱模式
發布者(producer)只需要把消息發送給一個交換機(exchange)。交換機非常簡單,它一邊從發布者方接收消息,一邊把消息推送到隊列。交換機必須知道如何處理它接收到的消息,是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規則是通過交換機類型(exchange type)來定義的
交換機類型:直連交換機(direct)-- 一對一, 之前使用的就是這個;主題交換機(topic)-- 模糊匹配,需要符合匹配規則; headers(頭交換機)和 扇型交換機(fanout)-- 進行消息廣播
fanout會發送消息到交換機所有的消息隊列
消息將會根據指定的routing_key分發到指定的隊列
rabbitmq擁有一個默認交換機 即是 空字符串(""),
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
在pika編程中,可以不用指定隊列名稱,系統會隨機生成一個名稱, 在重啟都該隊列丟失
只需在聲明時不提供參數就可以了
result = channel.queue_declare()
隊列需要綁定到交換, 才能通過交換機發送消息到該隊列
channel.queue_bind(exchange='logs',
queue=result.method.queue)
代碼:
生產者:
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
connection.close()
消費者:
channel = connection.channel()
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
6. 幾個重要概念的程序實現
- 路由 routing
路由鍵在發送消息的時候進行指定
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
- 隊列綁定 binding
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
- 交換機類型
交換機聲明的時候進行指定, 一般常用direct, fanout, topic三種類型
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
- 路由鍵
一般路由鍵不等于綁定鍵, 但是我們通常在direct的時候可以近似的認為這兩個等價的
在隊列綁定的時候,通過指定routing_key 指定
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='tologs')
- 一個路由鍵多個隊列
也就是多個隊列使用相同的綁定鍵, 這個是合法 的, 這樣就可以將消息發生到不同的隊列中
例如:
channel.queue_bind(exchange="logs",queue="info",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="warn",routing_key='tologs')
channel.queue_bind(exchange="logs",queue="debug",routing_key='tologs')
- 排他隊列:
一個只有自己可見的隊列,即不允許其它用戶訪問,RabbitMQ允許你將一個Queue聲明成為排他性的
channel.queue_declare(exclusive=True)
示例代碼:
生產者:
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
消費者:
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
7. 主題交換機 (topic)
發送到主題交換機(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個由
.
分隔開的詞語列表, 例如: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”
綁定鍵也必須擁有同樣的格式。主題交換機背后的邏輯跟直連交換機很相似, 攜帶著特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列
它的綁定鍵和路由鍵有兩個特殊應用方式, 即是支持模糊匹配:
-
*
(星號) 用來表示一個單詞. -
#
(井號) 用來表示任意數量(零個或多個)單詞。
例如路由鍵:
*.*.rabbit
,
lazy.#
lazy.pink.rabbit 會匹配
*.*.rabbit
和
lazy.#
lazy.x 匹配
lazy.#
注特殊情況:
當一個隊列的綁定鍵為 “#”(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當
*
(星號) 和
#
(井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
示例代碼:
生產者:
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
消費者:
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
auto_ack=True)
channel.start_consuming()
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
