package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class Drpctest {
public static final Logger logger = Logger.getLogger(Drpctest.class);
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
logger.error(this.toString() + "word = " + word);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
logger.error(this.toString() + "count = " + count);
collector.emit(new Values(word, count));
}
String str = Thread.currentThread().getName();
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
logger.error("declareOutputFields :");
declarer.declare(new Fields("result", "count"));
}
}
public static class DrpcBolt extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String logString = tuple.getString(0);
logger.error("DrpcBolt recve :" + logString);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 暫時沒用
declarer.declare(new Fields("word1", "count1"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// drpc
LocalDRPC drpc = new LocalDRPC();
DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);
builder.setSpout("drpcspout", drpc_spout, 3);
PropertyConfigurator
.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");
// 接入drpc
builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(
"drpcspout");
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
String str = "send test drpc"; // 和 DRPCSpout 名字對應(yīng)
drpc.execute("testdrpc", str);
Thread.sleep(10000);
cluster.shutdown();
}
}
}
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

