本文完整代碼下載:github鏈接
目前在做的工作有一部門是搭建一個可供公司內部使用的推送平臺,用的中間件是redis,于是就自然的想用redis5.0版本的新特性來實現這個功能,網上的demo比較少,且大多是終端操作的命令行,寫了一個Python的類和大家分享。
在介紹具體實現之前,先大致介紹一下背景。
在Redis5.0版本發布之前,redis也有一個發布、訂閱功能,但功能非常簡單,只能單純的發布和訂閱,適合在即時通信里使用。缺點非常多:
-
消息沒有持久化的機制。在Pub/Sub模型中,消費者是和連接(Connection)綁定的,當消費者的連接斷掉(網絡原因或者消費者進程crash)后,再次重連,那么Channel中的消息將永久消失(對于該消費者而言),也就是說Pub/Sub模型缺少消息回溯的機制
-
消費消息的速度和消費者的數量成反比。在Redis的實現中,Redis會把Channel中的消息逐個(Linear)推送給每個消費者,因此當消費者的數量達到一定規模時,服務器的性能將線性下降,因此每個消費者獲取到消息的延遲也線性增長
-
當生產者產生消息的速度遠大于消費者的消費能力的時候(此時可以簡單地理解為消息積壓),消費者會被強制斷開連接,因此會造成消息的丟失,這個特性可以詳見redis的配置
-
對頻道的消費者信息沒有展現接口。 在我們的項目里需要管理每一個頻道的訂閱者,雖然redis本身有記錄,但是并沒有提供API可以訪問。
Redis5.0最大的新特性就是多出了一個數據結構Stream,它是一個新的強大的支持多播的可持久化的消息隊列,設計和Kafka非常相似。
- Redis Stream有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的ID和對應的內容。消息是持久化的,Redis重啟后,內容還在。
- 每個消費組都有一個Stream內唯一的名稱,消費組不會自動創建,它需要單獨的指令xgroup create進行創建,需要指定從Stream的某個消息ID開始消費,這個ID用來初始化last_delivered_id變量。
- 每個Stream都可以掛多個消費組,每個消費組會有個游標last_delivered_id在Stream數組之上往前移動,表示當前消費組已經消費到哪條消息了。
- 同一個消費組(Consumer Group)可以掛接多個消費者(Consumer),這些消費者之間是競爭關系,任意一個消費者讀取了消息都會使游標last_delivered_id往前移動。每個消費者者有一個組內唯一名稱。【但是每個消費者并沒有消費到哪條?消息的單獨記錄,所以后續我隊列的消費者就是一個只含有一個消費者的消費組,這樣可以方便記錄更多信息】
- 消費者(Consumer)內部會有個狀態變量pending_ids,它記錄了當前已經被客戶端讀取的消息,但是還沒有ack。如果客戶端沒有ack,這個變量里面的消息ID會越來越多,一旦某個消息被ack,它就開始減少。這個pending_ids變量用來確保客戶端至少消費了消息一次,而不會在網絡傳輸的中途丟失了沒處理。
具體實現:
class SubRedis(object):
? ? def __init__(self):
? ? ? ? if not hasattr(SubRedis, 'pool'):
? ? ? ? ? ? SubRedis.getRedisCoon() ?#創建redis連接池
? ? ? ? self._coon = redis.Redis(connection_pool=SubRedis.pool)
? ? @staticmethod
? ? def getRedisCoon():
? ? ? ? SubRedis.pool = redis.ConnectionPool(host=redisInfo['SubRedisAddress'],password=redisInfo['SubRedisPassword'],port=redisInfo['SubRedisPort'],db=redisInfo['db'])
#返回一個channel的具體信息: 訂閱者數量,最后送達的msg的ID...
def channel_info(self,channel):
return self._coon.xinfo_stream(channel)
#返回一個channel的具體訂閱群組的信息(這里是返回訂閱者,因為每一個群組里只有一個消費者)
def channel_consumers_info(self,channel):
InfoList = self._coon.xinfo_groups(channel)
for GroupDict in InfoList:
GroupDict.pop("consumers")
return InfoList
#創建消費者
def create_consumer_group(self,name,channel):
ret = self._coon.xgroup_create(channel,name,id="$")
if ret == True:
print self.channel_consumers_info(channel)
else:
logging.error("create consumer %s fill,ret %s" %(name,ret))
#往某一個channel發送消息
def publish(self,channel,msg):
msgid = self._coon.xadd(channel,msg)
return msgid
def consumer_already_subscribed(self,channel,consumer):
channel_consumers_infolist = self.channel_consumers_info(channel)
for consumer_dict in channel_consumers_infolist:
if consumer in consumer_dict.values():
logging.warning("consumer %s has already subscribed %s" % (consumer, channel))
return True
return False
#已經存在的訂閱者訂閱新頻道
def subscribe(self,name,channel):
if(self.consumer_already_subscribed(channel,consumer)):
return False
self.create_consumer_group(name,channel)
print "%s subscribe %s success,channel %s info:"%(name,channel,channel),self.channel_consumers_info(channel)
return
#監聽并寫入新消息到文件
def listen_channel(self,channel,consumer,file):
if not (self.consumer_already_subscribed(channel,consumer)):
return False
mess = self._coon.xreadgroup(consumer,consumer,{channel:">"})
if mess != []:
msg_list = mess[0][1]
for msg in msg_list:
id, content = msg[0], msg[1]
content["msgid"] = id
json_content = json.dumps(content)
json_content += ","
print ("new message: ",content)
with open(file, "a") as f:
f.write(json_content)
self._coon.xack(channel, consumer, id)
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
