欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

使用Python操作Redis5.0新特性Stream實現訂閱發布功能

系統 2191 0

  本文完整代碼下載:github鏈接

  目前在做的工作有一部門是搭建一個可供公司內部使用的推送平臺,用的中間件是redis,于是就自然的想用redis5.0版本的新特性來實現這個功能,網上的demo比較少,且大多是終端操作的命令行,寫了一個Python的類和大家分享。

在介紹具體實現之前,先大致介紹一下背景。

在Redis5.0版本發布之前,redis也有一個發布、訂閱功能,但功能非常簡單,只能單純的發布和訂閱,適合在即時通信里使用。缺點非常多:

  1. 消息沒有持久化的機制。在Pub/Sub模型中,消費者是和連接(Connection)綁定的,當消費者的連接斷掉(網絡原因或者消費者進程crash)后,再次重連,那么Channel中的消息將永久消失(對于該消費者而言),也就是說Pub/Sub模型缺少消息回溯的機制

  2. 消費消息的速度和消費者的數量成反比。在Redis的實現中,Redis會把Channel中的消息逐個(Linear)推送給每個消費者,因此當消費者的數量達到一定規模時,服務器的性能將線性下降,因此每個消費者獲取到消息的延遲也線性增長

  3. 當生產者產生消息的速度遠大于消費者的消費能力的時候(此時可以簡單地理解為消息積壓),消費者會被強制斷開連接,因此會造成消息的丟失,這個特性可以詳見redis的配置

  4. 對頻道的消費者信息沒有展現接口。 在我們的項目里需要管理每一個頻道的訂閱者,雖然redis本身有記錄,但是并沒有提供API可以訪問。

Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支持多播的可持久化的消息隊列,設計和Kafka非常相似。

  1. Redis Stream有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
  2. 每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。
  3. 每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。
  4. 同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內唯一名稱。【但是每個消費者并沒有消費到哪條?消息的單獨記錄,所以后續我隊列的消費者就是一個只含有一個消費者的消費組,這樣可以方便記錄更多信息】
  5. 消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。

 具體實現:


            
              class SubRedis(object):

? ? def __init__(self):
? ? ? ? if not hasattr(SubRedis, 'pool'):
? ? ? ? ? ? SubRedis.getRedisCoon() ?#創建redis連接池
? ? ? ? self._coon = redis.Redis(connection_pool=SubRedis.pool)

? ? @staticmethod
? ? def getRedisCoon():
? ? ? ? SubRedis.pool = redis.ConnectionPool(host=redisInfo['SubRedisAddress'],password=redisInfo['SubRedisPassword'],port=redisInfo['SubRedisPort'],db=redisInfo['db'])

    #返回一個channel的具體信息: 訂閱者數量,最后送達的msg的ID...
    def channel_info(self,channel):
        return self._coon.xinfo_stream(channel)

    #返回一個channel的具體訂閱群組的信息(這里是返回訂閱者,因為每一個群組里只有一個消費者)
    def channel_consumers_info(self,channel):
        InfoList = self._coon.xinfo_groups(channel)
        for GroupDict in InfoList:
            GroupDict.pop("consumers")
        return InfoList

    #創建消費者
    def create_consumer_group(self,name,channel):
        ret = self._coon.xgroup_create(channel,name,id="$")
        if ret == True:
            print self.channel_consumers_info(channel)
        else:
            logging.error("create consumer %s fill,ret %s" %(name,ret))
    
    #往某一個channel發送消息
    def publish(self,channel,msg):
        msgid = self._coon.xadd(channel,msg)
        return msgid
    def consumer_already_subscribed(self,channel,consumer):
        channel_consumers_infolist = self.channel_consumers_info(channel)
        for consumer_dict in channel_consumers_infolist:
            if consumer in consumer_dict.values():
                logging.warning("consumer %s has already subscribed %s" % (consumer, channel))
                return True
        return False
    
    #已經存在的訂閱者訂閱新頻道
    def subscribe(self,name,channel):
        if(self.consumer_already_subscribed(channel,consumer)):
            return False
        self.create_consumer_group(name,channel)
        print "%s subscribe %s success,channel %s info:"%(name,channel,channel),self.channel_consumers_info(channel)
        return
    
    #監聽并寫入新消息到文件
    def listen_channel(self,channel,consumer,file):
        if not (self.consumer_already_subscribed(channel,consumer)):
            return False
        mess = self._coon.xreadgroup(consumer,consumer,{channel:">"})
        if mess != []:
            msg_list = mess[0][1]
            for msg in msg_list:
                id, content = msg[0], msg[1]
                content["msgid"] = id
                json_content = json.dumps(content)
                json_content += ","
                print ("new message: ",content)
                with open(file, "a") as f:
                    f.write(json_content)
                self._coon.xack(channel, consumer, id)
            
          

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 一区二区三区四区国产 | 国产精品亚洲成在人线 | 精品一区二区三区视频 | 国产高清免费 | 欧美日韩在线第一页 | 亚洲一区二区三区高清网 | 日韩一区二区不卡 | 伊人狠狠干 | 精品国产不卡一区二区三区 | 国产片侵犯亲女视频播放 | 一级激情片 | 国内精品久久久久激情影院 | 欧美日韩视频在线第一区 | 黄色一级网站 | 香港三级台湾三级在线播放徐 | 国内精品伊人久久 | 99re热精品视频 | 天天看逼 | 成人性生交大片 | 日本又黄又粗暴的gif动态图含羞 | 午夜影院小视频 | 国产视频精品免费 | 亚洲免费在线视频 | 色噜噜狠狠色综合欧洲selulu | 国产高清在线精品一区二区三区 | 久久国产精品视频 | 亚洲欧美日韩精品久久 | 亚洲精品天堂 | 四虎影视永久免费观看网址 | 在线播放一区二区三区 | 综合精品在线 | 久久久午夜精品 | 亚洲成人av一区二区 | 午夜精品久久久久久99热软件 | 日韩有码一区 | 欧美一级毛片免费播放器 | 国产免费小视频 | 午夜精品视频在线看 | 色网站在线免费观看 | 成人亚洲一区 | 欧美特一级片 |