org.apache.stormstorm-core0.9.3org.apache.kafka

欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

kafka+storm連接

系統(tǒng) 1718 0

本項(xiàng)目為maven項(xiàng)目,需要添加必要的storm庫(kù),以及kafka依賴(lài),使用storm自帶的storm-kafka進(jìn)行連接,根據(jù)自己集群環(huán)境

      		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-core</artifactId>

			<version>0.9.3</version>

		</dependency>



		<dependency>

			<groupId>org.apache.kafka</groupId>

			<artifactId>kafka_2.10</artifactId>

			<version>0.8.2.1</version>

			<exclusions>

				<exclusion>

					<groupId>org.apache.zookeeper</groupId>

					<artifactId>zookeeper</artifactId>

				</exclusion>

				<exclusion>

					<groupId>log4j</groupId>

					<artifactId>log4j</artifactId>

				</exclusion>

			</exclusions>

		</dependency>



		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-kafka</artifactId>

			<version>0.9.3</version>

		</dependency>


    

  實(shí)例topology:

      package com.xh.kafka.test;



import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;



public class KafkaSpoutTest {



	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

		

		BrokerHosts brokerHosters = new ZkHosts("zookeeperip1:2181,zookeeperip2:2181/kafka/65_250-252");

		

		String topic = "log_test";

		

		//offsetZkRoot 和 offsetZkId 自定義即可

		String offsetZkRoot = "/storm_test";

		String offsetZkId = "kafka-storm";

		

		SpoutConfig spoutConfig = new SpoutConfig(brokerHosters, topic, offsetZkRoot, offsetZkId);

		

		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());		

		

				

		Config conf = new Config();

			

		TopologyBuilder builder = new TopologyBuilder();



		builder.setSpout("spout", new KafkaSpout(spoutConfig));

		builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");

	

		if(args != null && args.length > 0){

			conf.setNumWorkers(3);

			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

		}else{

			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("my-topology", conf, builder.createTopology());

		}

	}



}


    

  此外,不管是本地運(yùn)行還是集群運(yùn)行,都需要 修改host文件,添加,kafka集群的機(jī)器名 ,例如:

      192.168.*.* kafka-01

192.168.**.** kafka-02

192.168.***.*** kafka-03


    

  否則會(huì)報(bào)錯(cuò)如下:

      23810 [Thread-10-spout] INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException



23815 [Thread-10-spout] ERROR backtype.storm.util - Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]

at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]

at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]

at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_65]

Caused by: java.nio.channels.ClosedChannelException: null

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.3.jar:0.9.3]

... 6 common frames omitted


    

kafka+storm連接


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 国产精品啪一品二区三区粉嫩 | 日韩在线观看中文字幕 | 免费视频二区 | 欧美日韩在线一区二区 | 色午夜影院 | 亚洲 中文 欧美 日韩 在线观看 | 精品一区二区三区的国产在线观看 | 亚洲最黄网站 | 欧美另类性视频 | 免费观看性欧美一级 | 蜜桃黄网 | 日本国产视频 | 奇米影音先锋 | 最近最新中文字幕 | 全色网站 | 特黄特色大片免费视频观看 | 欧美电影精品久久久久 | 96福利视频 | 天天搞夜夜操 | 玖玖爱365| 久久精品国产一区二区 | 欧美片第一页 | 在线免费观看毛片 | av黄色在线 | 奇米影视888狠狠狠777九色 | 色屁屁影院www免费 特片网久久 | 亚洲天堂视频在线免费观看 | 免费国产在线视频 | 四虎影视最新网站在线播放 | 亚洲欧美日韩在线一区二区三区 | 黄色成年在线观看 | 久久人人爽人人爽 | 亚洲国产精品一区二区第一页 | 免费黄色片网站 | 日本在线高清视频 | 日韩a无v码在线播放免费 | 欧美黑人又粗又长 | 国产一级做a爰片久久毛片 欧美一区欧美二区 | 国产熟妇另类久久久久XYZ | 国产一区二区三区久久久久久久久 | 国产精品久久久久久久久久久久久 |