org.apache.stormstorm-core0.9.3org.apache.kafka

欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

kafka+storm連接

系統(tǒng) 1718 0

本項(xiàng)目為maven項(xiàng)目,需要添加必要的storm庫(kù),以及kafka依賴(lài),使用storm自帶的storm-kafka進(jìn)行連接,根據(jù)自己集群環(huán)境

      		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-core</artifactId>

			<version>0.9.3</version>

		</dependency>



		<dependency>

			<groupId>org.apache.kafka</groupId>

			<artifactId>kafka_2.10</artifactId>

			<version>0.8.2.1</version>

			<exclusions>

				<exclusion>

					<groupId>org.apache.zookeeper</groupId>

					<artifactId>zookeeper</artifactId>

				</exclusion>

				<exclusion>

					<groupId>log4j</groupId>

					<artifactId>log4j</artifactId>

				</exclusion>

			</exclusions>

		</dependency>



		<dependency>

			<groupId>org.apache.storm</groupId>

			<artifactId>storm-kafka</artifactId>

			<version>0.9.3</version>

		</dependency>


    

  實(shí)例topology:

      package com.xh.kafka.test;



import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.StringScheme;

import storm.kafka.ZkHosts;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.spout.SchemeAsMultiScheme;

import backtype.storm.topology.TopologyBuilder;



public class KafkaSpoutTest {



	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

		

		BrokerHosts brokerHosters = new ZkHosts("zookeeperip1:2181,zookeeperip2:2181/kafka/65_250-252");

		

		String topic = "log_test";

		

		//offsetZkRoot 和 offsetZkId 自定義即可

		String offsetZkRoot = "/storm_test";

		String offsetZkId = "kafka-storm";

		

		SpoutConfig spoutConfig = new SpoutConfig(brokerHosters, topic, offsetZkRoot, offsetZkId);

		

		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());		

		

				

		Config conf = new Config();

			

		TopologyBuilder builder = new TopologyBuilder();



		builder.setSpout("spout", new KafkaSpout(spoutConfig));

		builder.setBolt("bolt", new SequenceBolt()).shuffleGrouping("spout");

	

		if(args != null && args.length > 0){

			conf.setNumWorkers(3);

			StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

		}else{

			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("my-topology", conf, builder.createTopology());

		}

	}



}


    

  此外,不管是本地運(yùn)行還是集群運(yùn)行,都需要 修改host文件,添加,kafka集群的機(jī)器名 ,例如:

      192.168.*.* kafka-01

192.168.**.** kafka-02

192.168.***.*** kafka-03


    

  否則會(huì)報(bào)錯(cuò)如下:

      23810 [Thread-10-spout] INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: java.nio.channels.ClosedChannelException



23815 [Thread-10-spout] ERROR backtype.storm.util - Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.3.jar:0.9.3]

at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]

at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]

at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_65]

Caused by: java.nio.channels.ClosedChannelException: null

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.10-0.8.2.1.jar:na]

at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.10-0.8.2.1.jar:na]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[storm-kafka-0.9.3.jar:0.9.3]

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.3.jar:0.9.3]

... 6 common frames omitted


    

kafka+storm連接


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 国产一级免费视频 | 精品视频久久 | 夜夜夜夜爽 | 日韩欧美在线免费观看 | 欧美日韩精品在线观看 | 男人j进女人j啪啪无遮挡动态 | 国产chinese视频在线观看 | 五月激情六月婷婷 | 欧美视频网站 | 日韩精品免费 | 在线精品日韩 | 九九九久久久久久久爱 | 欧美精品 在线观看 | 中文字幕人成乱码在线观看 | www久久精品 | 日本午夜影院 | 在线观看特色大片免费网站 | 欧美卡一卡二卡新区网站 | 一本到在线观看视频不卡 | 国产亚洲精品日韩香蕉网 | 国产精品亚洲片在线观看不卡 | 亚洲欧洲av在线 | 韩国A片国产浪潮AV 久久99国产精品 | 一区二区三区四区国产 | 日本在线视 | 久久国产成人 | 国产精品午夜电影 | 日本成人一二三区 | 成人免费视频网站在线观看 | 国产亚洲精品久久久极品美女 | 亚洲一区久久久 | 操人视频在线观看 | 大蕉香蕉久久爱 | 激情五月色婷婷 | 午夜影院恐怖电影免费看 | 久久久精品免费热线观看 | 日韩欧美国产精品 | 99久热国产精品视频尤物不卡 | 九二淫黄大片看片 | 欧美三级在线播放 | 亚洲精品国产a久久久久久 亚洲国产精品第一页 |