欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

python Rabbitmq編程(一)

系統(tǒng) 2193 0

python Rabbitmq編程(一)

?

?

實(shí)現(xiàn)最簡單的隊(duì)列通信

python Rabbitmq編程(一)_第1張圖片

?

send端

            
              #
            
            
              !/usr/bin/env python
            
            
              import
            
            
               pika
credentials 
            
            = pika.PlainCredentials(
            
              "用戶名
            
            
              "
            
            ,
            
              "密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            = connection.channel() 
            
              #
            
            
              建立了rabbit協(xié)議的通道
            
            
              #
            
            
               聲明queue
            
            
channel.queue_declare(queue=
            
              '
            
            
              hello
            
            
              '
            
            
              )


            
            
              #
            
            
               n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
            
            
channel.basic_publish(exchange=
            
              ''
            
            
              ,
                      routing_key
            
            =
            
              '
            
            
              hello
            
            
              '
            
            
              ,
                      body
            
            =
            
              '
            
            
              Hello World!
            
            
              '
            
            
              )

            
            
              print
            
            (
            
              "
            
            
               [x] Sent 'Hello World!'
            
            
              "
            
            
              )
connection.close()
            
          

?

receive端

            
              #
            
            
               _*_coding:utf-8_*_
            
            
              __author__
            
             = 
            
              '
            
            
              Alex Li
            
            
              '
            
            
              import
            
            
               pika
credentials 
            
            = pika.PlainCredentials(
            
              "用戶名
            
            
              "
            
            ,
            
              "密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            = connection.channel() 
            
              #
            
            
              建立了rabbit協(xié)議的通道
            
            
              #
            
            
               You may ask why we declare the queue again ? we have already declared it in our previous code.
            
            
              
#
            
            
               We could avoid that if we were sure that the queue already exists. For example if send.py program
            
            
              
#
            
            
               was run before. But we're not yet sure which program to run first. In such cases it's a good
            
            
              
#
            
            
               practice to repeat declaring the queue in both programs.
            
            
channel.queue_declare(queue=
            
              '
            
            
              hello
            
            
              '
            
            
              )



            
            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            (
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               body)


            
            
              #
            
            
               callback函數(shù)當(dāng)拿到隊(duì)列里的值,則調(diào)用
            
            
              channel.basic_consume(callback,
                      queue
            
            =
            
              '
            
            
              hello
            
            
              '
            
            
              ,
                      no_ack
            
            =
            
              True)


            
            
              print
            
            (
            
              '
            
            
               [*] Waiting for messages. To exit press CTRL+C
            
            
              '
            
            
              )

channel.start_consuming()
            
          

?

          #注意:
          
            遠(yuǎn)程連接rabbitmq server的話,需要配置權(quán)限。
          
          
#1.設(shè)置用戶與密碼
            
              #
            
            
               > rabbitmqctl add_user name pass
            
            
              
#
            
            
               > rabbitmqctl set_user_tags name administrator
            
          
          #2.設(shè)置權(quán)限,允許從外面訪問
        
            
              #
            
            
               rabbitmqctl set_permissions -p /name ".*" ".*" ".*"
            
          
              set_permissions [-
              
                p vhost] {user} {conf} {write} {read}

vhost
The name of the virtual host to which to grant the user access, defaulting to 
              
              /
              
                .

user
The name of the user to grant access to the specified virtual host.

conf
A regular expression matching resource names 
              
              
                for
              
               which the user 
              
                is
              
              
                 granted configure permissions.

write
A regular expression matching resource names 
              
              
                for
              
               which the user 
              
                is
              
              
                 granted write permissions.

read
A regular expression matching resource names 
              
              
                for
              
               which the user 
              
                is
              
               granted read permissions.
            
set_permissions補(bǔ)充
          #3.生產(chǎn)者與消費(fèi)者添加認(rèn)證信息
        
            credentials = pika.PlainCredentials(
            
              "用戶名
            
            
              "
            
            ,
            
              "密碼
            
            
              "
            
            )
          

?

            #為什么要聲明兩次queue,這里hello為隊(duì)列名
# channel.queue_declare(queue='hello')
# 解決發(fā)起者先啟動(dòng),而接收者還沒有啟動(dòng),發(fā)送者先創(chuàng)建queue,
# 如果發(fā)起者已經(jīng)聲明了,接收者會(huì)檢測有沒有queue,如果有了,實(shí)際接收者是不會(huì)執(zhí)行聲明的,沒有就會(huì)聲明這個(gè)queue。

          

?

消息公平分發(fā)(循環(huán)調(diào)度)

            在這種模式下,RabbitMQ會(huì)默認(rèn)把p發(fā)的消息依次分發(fā)給各個(gè)消費(fèi)者(c)。
          
            輪巡公平的發(fā)送給接收者,比如第一次發(fā)送給第一個(gè)接收者,第二次發(fā)送給第二格接受者,如此。
          

python Rabbitmq編程(一)_第2張圖片

send端

            
              import
            
            
               pika

            
            
              import
            
            
               time

credentials 
            
            = pika.PlainCredentials(
            
              "
            
            
              用戶名
            
            
              "
            
            ,
            
              "
            
            
              密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            =
            
               connection.channel()


            
            
              #
            
            
               聲明queue
            
            
channel.queue_declare(queue=
            
              '
            
            
              task_queue
            
            
              '
            
            
              )


            
            
              #
            
            
               n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
            
            
              import
            
            
               sys

message 
            
            = 
            
              '
            
            
              '
            
            .join(sys.argv[1:]) 
            
              or
            
            
              "
            
            
              Hello World! %s
            
            
              "
            
             %
            
               time.time()
channel.basic_publish(exchange
            
            =
            
              ''
            
            
              ,
                      routing_key
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            
              ,
                      body
            
            =
            
              message,
                      properties
            
            =
            
              pika.BasicProperties(
                          delivery_mode
            
            =2,  
            
              #
            
            
               make message persistent
            
            
                                    )
                      )

            
            
              print
            
            (
            
              "
            
            
               [x] Sent %r
            
            
              "
            
             %
            
               message)
connection.close()
            
          

?

receive端

            
              #
            
            
               _*_coding:utf-8_*_
            
            
              import
            
            
               pika, time
credentials 
            
            = pika.PlainCredentials(
            
              "
            
            
              用戶名
            
            
              "
            
            ,
            
              "
            
            
              密碼
            
            
              "
            
            
              )
connection 
            
            =
            
               pika.BlockingConnection(pika.ConnectionParameters(
    
            
            
              '
            
            
              localhost
            
            
              '
            
            ,credentials=
            
              credentials))
channel 
            
            =
            
               connection.channel()



            
            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            (
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               body)
    time.sleep(
            
            20
            
              )
    
            
            
              print
            
            (
            
              "
            
            
               [x] Done
            
            
              "
            
            
              )
    
            
            
              print
            
            (
            
              "
            
            
              method.delivery_tag
            
            
              "
            
            
              , method.delivery_tag)
    ch.basic_ack(delivery_tag
            
            =
            
              method.delivery_tag)


channel.basic_consume(callback,
                      queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            
              ,
                      no_ack
            
            =
            
              True
                      )


            
            
              print
            
            (
            
              '
            
            
               [*] Waiting for messages. To exit press CTRL+C
            
            
              '
            
            
              )
channel.start_consuming()
            
          

?

消息確認(rèn)

執(zhí)行任務(wù)可能需要幾秒鐘。 你可能想知道如果其中一個(gè)消費(fèi)者開始一項(xiàng)長期任務(wù)并且只是部分完成而死亡會(huì)發(fā)生什么。 使用我們當(dāng)前的代碼,一旦RabbitMQ向消費(fèi)者傳遞消息,它立即將其標(biāo)記為刪除。 在這種情況下,如果你殺死一個(gè)工人,我們將丟失它剛剛處理的消息。 我們還將丟失分發(fā)給這個(gè)特定工作者但尚未處理的所有消息。

但我們不想失去任何任務(wù)。 如果工人死亡,我們希望將任務(wù)交付給另一名工人。

為了確保消息永不丟失,RabbitMQ支持? 消息 確認(rèn) 消費(fèi)者發(fā)回ack(nowledgement)告訴RabbitMQ已收到,處理了特定消息,RabbitMQ可以自由刪除它。

如果消費(fèi)者死亡(其通道關(guān)閉,連接關(guān)閉或TCP連接丟失)而不發(fā)送確認(rèn),RabbitMQ將理解消息未完全處理并將重新排隊(duì)。 如果同時(shí)有其他在線消費(fèi)者,則會(huì)迅速將其重新發(fā)送給其他消費(fèi)者。 這樣你就可以確保沒有消息丟失,即使工人偶爾會(huì)死亡。

沒有任何消息超時(shí);? 當(dāng)消費(fèi)者死亡時(shí),RabbitMQ將重新發(fā)送消息。 即使處理消息需要非常長的時(shí)間,也沒關(guān)系。

默認(rèn)情況下, 手動(dòng)消息確認(rèn) 已打開。 在前面的示例中,我們通過 auto_ack = True ?標(biāo)志 明確地將它們關(guān)閉 在我們完成任務(wù)后,是時(shí)候刪除此標(biāo)志并從工作人員發(fā)送適當(dāng)?shù)拇_認(rèn)。

            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               (body,)
    time.sleep( body.count(
            
            
              '
            
            
              .
            
            
              '
            
            
              ) )
    
            
            
              print
            
            
              "
            
            
               [x] Done
            
            
              "
            
            
              
    ch.basic_ack(delivery_tag 
            
            =
            
               method.delivery_tag)
 
channel.basic_consume(callback,
                      queue
            
            =
            
              '
            
            
              hello
            
            
              '
            
            )
          

  Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered

?

?

消息持久化  

我們已經(jīng)學(xué)會(huì)了如何確保即使消費(fèi)者死亡,任務(wù)也不會(huì)丟失。 但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)仍然會(huì)丟失。

當(dāng)RabbitMQ退出或崩潰時(shí),它將忘記隊(duì)列和消息,除非你告訴它不要。 確保消息不會(huì)丟失需要做兩件事:我們需要將隊(duì)列和消息都標(biāo)記為持久。

首先,我們需要確保RabbitMQ永遠(yuǎn)不會(huì)丟失我們的隊(duì)列。 為此,我們需要聲明它是 持久的

            channel.queue_declare(queue=
            
              '
            
            
              hello
            
            
              '
            
            , durable=True)
          

雖然此命令本身是正確的,但它在我們的設(shè)置中不起作用。 那是因?yàn)槲覀円呀?jīng)定義了一個(gè)名為 hello 的隊(duì)列 ?, 這個(gè)隊(duì)列 不耐用。 RabbitMQ不允許您使用不同的參數(shù)重新定義現(xiàn)有隊(duì)列,并將向嘗試執(zhí)行此操作的任何程序返回錯(cuò)誤。 但是有一個(gè)快速的解決方法 - 讓我們聲明一個(gè)具有不同名稱的隊(duì)列,例如 task_queue

            channel.queue_declare(queue=
            
              '
            
            
              task_queue
            
            
              '
            
            , durable=True)
          

queue_declare 更改需要應(yīng)用于生產(chǎn)者和消費(fèi)者代碼。

此時(shí)我們確信 即使RabbitMQ重新啟動(dòng) task_queue 隊(duì)列也不會(huì)丟失。 現(xiàn)在我們需要將消息標(biāo)記為持久性 - 通過提供 值為 2 delivery_mode 屬性

            channel.basic_publish(exchange=
            
              ''
            
            
              ,
                      routing_key
            
            =
            
              "
            
            
              task_queue
            
            
              "
            
            
              ,
                      body
            
            =
            
              message,
                      properties
            
            =
            
              pika.BasicProperties(
                         delivery_mode 
            
            = 2, 
            
              #
            
            
               make message persistent
            
            
                      ))
          

?

?

負(fù)載均衡

            如果Rabbit只管按順序把消息發(fā)到各個(gè)消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個(gè)機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時(shí)配置高的消費(fèi)者卻一直很輕松。為解決此問題,可以在各個(gè)消費(fèi)者端,
            
配置perfetch_count=1,意思就是告訴RabbitMQ在我這個(gè)消費(fèi)者當(dāng)前消息還沒處理完的時(shí)候就不要再給我發(fā)新消息了。

python Rabbitmq編程(一)_第3張圖片

send端

            
              #
            
            
              !/usr/bin/env python
            
            
              import
            
            
               pika

            
            
              import
            
            
               sys

connection 
            
            =
            
               pika.BlockingConnection(
    pika.ConnectionParameters(host
            
            =
            
              '
            
            
              localhost
            
            
              '
            
            
              ))
channel 
            
            =
            
               connection.channel()

channel.queue_declare(queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            , durable=
            
              True)

message 
            
            = 
            
              '
            
            
              '
            
            .join(sys.argv[1:]) 
            
              or
            
            
              "
            
            
              Hello World!
            
            
              "
            
            
              
channel.basic_publish(
    exchange
            
            =
            
              ''
            
            
              ,
    routing_key
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            
              ,
    body
            
            =
            
              message,
    properties
            
            =
            
              pika.BasicProperties(
        delivery_mode
            
            =2,  
            
              #
            
            
               make message persistent
            
            
                  ))

            
            
              print
            
            (
            
              "
            
            
               [x] Sent %r
            
            
              "
            
             %
            
               message)
connection.close()
            
          

?

receive端

            
              #
            
            
              !/usr/bin/env python
            
            
              import
            
            
               pika

            
            
              import
            
            
               time

connection 
            
            =
            
               pika.BlockingConnection(
    pika.ConnectionParameters(host
            
            =
            
              '
            
            
              localhost
            
            
              '
            
            
              ))
channel 
            
            =
            
               connection.channel()

channel.queue_declare(queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            , durable=
            
              True)

            
            
              print
            
            (
            
              '
            
            
               [*] Waiting for messages. To exit press CTRL+C
            
            
              '
            
            
              )



            
            
              def
            
            
               callback(ch, method, properties, body):
    
            
            
              print
            
            (
            
              "
            
            
               [x] Received %r
            
            
              "
            
             %
            
               body)
    time.sleep(body.count(b
            
            
              '
            
            
              .
            
            
              '
            
            
              ))
    
            
            
              print
            
            (
            
              "
            
            
               [x] Done
            
            
              "
            
            
              )
    ch.basic_ack(delivery_tag
            
            =
            
              method.delivery_tag)


channel.basic_qos(prefetch_count
            
            =1
            
              )
channel.basic_consume(queue
            
            =
            
              '
            
            
              task_queue
            
            
              '
            
            , on_message_callback=
            
              callback)

channel.start_consuming()
            
          

?


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會(huì)非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 奇米影视小说 | 国产成人综合久久 | 亚洲婷婷国产精品电影人久久 | 久在线观看 | 2017最新h无码动漫 | 亚洲欧美另类色妞网站 | 成人网在线观看 | 女人一级毛片免费视频观看 | 亚洲免费黄色 | 蜜桃精品导航 | 国产这里只有精品 | 中文字幕一区二区三区四区五区 | www97影院 | 91手机在线视频观看 | 亚洲美女一区二区三区 | 国产精品午夜电影 | 99热这里只有精品8 免费看搡女人的视频 | 老司机午夜免费精品视频 | 日韩精品久久久久影院 | 瑟瑟综合 | 欧美浮力影院 | 国产精品国产精品国产专区不卡 | 久操导航| 性做久久久 | 国产精品一区二区三区久久 | 妇女毛片 | 亚洲欧美日韩另类精品一区二区三区 | 国产日韩欧美一区 | 国产精品亚洲综合一区在线观看 | 精品视频在线观看视频免费视频 | 欧美精品欧美精品系列 | 国产精品成人亚洲一区二区 | 91免费精品国偷自产在线在线 | 天天爱夜夜爽 | 侮辱丰满美丽的人妻 | 成年人在线观看视频网站 | 91中文字幕在线观看 | 91茄子在线观看 | 国产三级在线观看a | 欧美精品在线视频观看 | 亚洲 欧美 日韩中文字幕一区二区 |