欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

python操作kafka實(shí)踐的示例代碼

系統(tǒng) 1749 0

1、先看最簡單的場景,生產(chǎn)者生產(chǎn)消息,消費(fèi)者接收消息,下面是生產(chǎn)者的簡單代碼。

            
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='xxxx:x')

msg_dict = {
  "sleep_time": 10,
  "db_config": {
    "database": "test_1",
    "host": "xxxx",
    "user": "root",
    "password": "root"
  },
  "table": "msg",
  "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('test_rhj', msg, partition=0)
producer.close()

          

下面是消費(fèi)者的簡單代碼:

            
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

          

下面是結(jié)果:

2、如果想要完成負(fù)載均衡,就需要知道kafka的分區(qū)機(jī)制,同一個主題,可以為其分區(qū),在生產(chǎn)者不指定分區(qū)的情況,kafka會將多個消息分發(fā)到不同的分區(qū),消費(fèi)者訂閱時候如果不指定服務(wù)組,會收到所有分區(qū)的消息,如果指定了服務(wù)組,則同一服務(wù)組的消費(fèi)者會消費(fèi)不同的分區(qū),如果2個分區(qū)兩個消費(fèi)者的消費(fèi)者組消費(fèi),則,每個消費(fèi)者消費(fèi)一個分區(qū),如果有三個消費(fèi)者的服務(wù)組,則會出現(xiàn)一個消費(fèi)者消費(fèi)不到數(shù)據(jù);如果想要消費(fèi)同一分區(qū),則需要用不同的服務(wù)組。以此為原理,我們對消費(fèi)者做如下修改:

            
from kafka import KafkaConsumer

consumer = KafkaConsumer('test_rhj', bootstrap_servers=['xxxx:x'])
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

          

然后我們開兩個消費(fèi)者進(jìn)行消費(fèi),生產(chǎn)者分別往0分區(qū)和1分區(qū)發(fā)消息結(jié)果如下,可以看到,一個消費(fèi)者只能消費(fèi)0分區(qū),另一個只能消費(fèi)1分區(qū):


3、kafka提供了偏移量的概念,允許消費(fèi)者根據(jù)偏移量消費(fèi)之前遺漏的內(nèi)容,這基于kafka名義上的全量存儲,可以保留大量的歷史數(shù)據(jù),歷史保存時間是可配置的,一般是7天,如果偏移量定位到了已刪除的位置那也會有問題,但是這種情況可能很小;每個保存的數(shù)據(jù)文件都是以偏移量命名的,當(dāng)前要查的偏移量減去文件名就是數(shù)據(jù)在該文件的相對位置。要指定偏移量消費(fèi)數(shù)據(jù),需要指定該消費(fèi)者要消費(fèi)的分區(qū),否則代碼會找不到分區(qū)而無法消費(fèi),代碼如下:

            
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.assign([TopicPartition(topic='test_rhj', partition=0), TopicPartition(topic='test_rhj', partition=1)])
print consumer.partitions_for_topic("test_rhj") # 獲取test主題的分區(qū)信息
print consumer.assignment()
print consumer.beginning_offsets(consumer.assignment())
consumer.seek(TopicPartition(topic='test_rhj', partition=0), 0)
for msg in consumer:
  recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
  print recv

          

因?yàn)橹付ǖ谋阋肆繛?,所以從一開始插入的數(shù)據(jù)都可以查到,而且因?yàn)橹付朔謪^(qū),指定的分區(qū)結(jié)果都可以消費(fèi),結(jié)果如下:

python操作kafka實(shí)踐的示例代碼_第1張圖片

4、有時候,我們并不需要實(shí)時獲取數(shù)據(jù),因?yàn)檫@樣可能會造成性能瓶頸,我們只需要定時去獲取隊(duì)列里的數(shù)據(jù)然后批量處理就可以,這種情況,我們可以選擇主動拉取數(shù)據(jù)

            
from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(group_id='123456', bootstrap_servers=['10.43.35.25:4531'])
consumer.subscribe(topics=('test_rhj',))
index = 0
while True:
  msg = consumer.poll(timeout_ms=5) # 從kafka獲取消息
  print msg
  time.sleep(2)
  index += 1
  print '--------poll index is %s----------' % index
          

結(jié)果如下,可以看到,每次拉取到的都是前面生產(chǎn)的數(shù)據(jù),可能是多條的列表,也可能沒有數(shù)據(jù),如果沒有數(shù)據(jù),則拉取到的為空:

python操作kafka實(shí)踐的示例代碼_第2張圖片

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 婷婷国产 | 极品丝袜高跟91极品系列 | 亚洲视频免费在线 | 古代级a毛片免费观看 | 精品国产福利在线 | av毛片在线免费看 | 亚洲综合图片色婷婷另类小说 | 欧美一级www | 99国产精品久久久久久久成人热 | 国产亚洲精品国产一区 | 亚洲一区在线观看视频 | 久久影院在线观看 | 久久久久久免费播放一级毛片 | 欧美日韩亚洲一区二区三区在线观看 | 亚洲午夜在线播放 | 波多野结衣在线免费播放 | 精品亚洲一区二区三区四区五区 | 欧美日韩在线第一页 | 免费看黄网站在线 | 国产精选91热在线观看 | 四虎影视在线影院在线观看观看 | 国产精品入口免费麻豆 | 精品免费国产一区二区三区四区 | 成人综合久久精品色婷婷 | 91男女视频 | 国产精品1| 中文字幕在线播放 | 亚洲综合影院 | 9191精品国产免费不久久 | 久久久精选| 欧美精品国产制服第一页 | 日韩精品成人 | 男女性情视频 | 成人精品一区二区三区 | 天天更新天天久久久更新影院 | 一区二区高清在线观看 | 热伊人99re久久精品最新地 | 国产tv在线 | 亚洲娇小性色xxxx | 99精品国产一区二区青青牛奶 | 色网站综合|