過程,
Spout 發送msgid 1-10
一級Bolt, msgid1的tuple做為基本組合tuple, 其他8個和一組合, 然后發送給二級Bolt, 同時單個msgid對應的tuple都ack一次,msgid1對象tuple, acker將會跟蹤8個二級bolt處理情況.
二級Bolt,發送ack fail(模擬處理失敗)
結果:在spout fail下出現msg1-9都失敗的情況 .
拓撲代碼
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.StormSubmitter;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology {
public static String GetThreadName() {
Thread thread = Thread.currentThread();
return thread.getName();
}
public static final Logger logger = Logger
.getLogger(WordCountTopology.class);
// 切分單詞 一級bolt
/*
* public static class SplitSentence extends ShellBolt implements IRichBolt
* { public SplitSentence() { super("python", "splitsentence.py");
* logger.error(GetThreadName() + "SplitSentence create"); }
*
* // 定義字段發送
*
* @Override public void declareOutputFields(OutputFieldsDeclarer declarer)
* { declarer.declare(new Fields("word")); logger.error(GetThreadName() +
* "declarer.declare(new Fields(word))"); }
*
* @Override public Map<String, Object> getComponentConfiguration() {
* logger.error("getComponentConfiguration"); return null; } }
*/
public static class SplitSentence implements IRichBolt {
private OutputCollector _collector;
int num = 0;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
private Tuple tuple1;
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
if(sentence.equals("a")) {
tuple1 = tuple;
}
else{
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple);
_collector.emit(anchors, new Values(sentence + "a"));
_collector.ack(tuple);
_collector.ack(tuple1);
}
// for (String word : sentence.split(" ")){
// _collector.emit(tuple, new Values(word));
// }
// num++;
System.out.println("Bolt Thread " + Thread.currentThread().getName() + "recve : " + sentence);
System.out.println( num + " bolt recev:" + tuple.getMessageId().getAnchorsToIds());
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
public static class CountCount1 implements IRichBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
private OutputCollector _collector;
int num = 0;
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
//logger.error(this.toString() + "word = " + word);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
num++;
_collector.fail(tuple);
//_collector.ack(tuple);
//_collector.emit(tuple, new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// logger.error("declareOutputFields :");
declarer.declare(new Fields("result", "count"));
}
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
_collector = collector;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
public static class WordCount extends BaseBasicBolt {
private OutputCollector _collector;
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
//logger.error(this.toString() + "word = " + word);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count); // <key, list<value, count> >
//logger.error(this.toString() + "count = " + count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// logger.error("declareOutputFields :");
declarer.declare(new Fields("result", "count"));
}
}
public static class WordCount1 extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
// logger.error("WordCount1");
// tuple.getFields()[0];
if (tuple.getFields().contains("result")) {
String count = (String) tuple.getValueByField("result");
// tuple.getValueByField(field)
long countl = -0;// = Long.valueOf(count);
// logger.error(this.toString() + " key = resultkey " + count);
}
if (tuple.getFields().contains("count")) {
Integer count = (Integer) tuple.getValueByField("count");
// tuple.getValueByField(field)
long countl = -0;// = Long.valueOf(count);
//logger.error(this.toString() + " key = count " + count);
}
// String word = tuple.getString(0);
// logger.error(this.toString() +"word = " + word);
// Integer count = counts.get(word);
// if (count == null)
// count = 0;
// count++;
// counts.put(word, count);
// logger.error(this.toString() + "count = " + count);
// collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// logger.error("declareOutputFields :");
declarer.declare(new Fields("word1", "count1"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
PropertyConfigurator
.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");
// parallelism_hint 代表是executor數量, setNumTasks 代表Tasks數量
builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);
builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout");
builder.setBolt("count", new CountCount1(), 12).fieldsGrouping("split",
new Fields("word"));
// builder.setBolt("WordCount1", new WordCount1(), 1).fieldsGrouping(
// "count", new Fields("result", "count"));
Config conf = new Config();
conf.setDebug(true);
? ? ? ? ? ? ? ??//? 這個設置一個spout task上面最多有多少個沒有處理(ack/fail)的tuple,防止tuple隊列過大, 只對可靠任務起作用
conf.setMaxSpoutPending(2);
conf.setMessageTimeoutSecs(5); // 消息處理延時
conf.setNumAckers(2); // 消息處理acker
System.out.println("args.length = " + args.length);
if (args != null && args.length > 0) {
conf.setNumWorkers(5); // 設置工作進程
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} else {
// 每個組件的最大executor數
conf.setMaxTaskParallelism(1);
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
String str = "testdrpc";
// drpc.execute("testdrpc", str);
Thread.sleep(1088000);
cluster.shutdown();
}
}
}
?spout代碼
package storm.starter.spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
import org.apache.log4j.Logger;
import storm.starter.WordCountTopology;
// IRichSpout
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
public static final Logger logger = Logger
.getLogger(RandomSentenceSpout.class);
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
WordCountTopology.logger.error(this.toString()
+ "RandomSentenceSpout is create");
}
private int num = 0;
private String gettmstr() {
StringBuilder tmp = new StringBuilder();
for (int i = 0; i <= num; i++)
tmp.append("a");
num++;
return tmp.toString();
}
@Override
public void nextTuple() {
Utils.sleep(200);
// String[] sentences = new String[]{ "the cow jumped over the moon",
// "an apple a day keeps the doctor away",
// "four score and seven years ago", "snow white and the seven dwarfs",
// "i am at two with nature" };
String[] sentences = new String[] { "A" };
String sentence = gettmstr(); // sentences[_rand.nextInt(sentences.length)];
if (num < 10) {
_collector.emit(new Values(sentence), new Integer(num));
// logger.error(this.toString() + "send sentence = " + sentence);
// System.out.println(Thread.currentThread().getName() + " Spout ");
}
}
@Override
public void ack(Object id) {
logger.error(this.toString() + "spout ack =" + (Integer)id);
}
@Override
public void fail(Object id) {
logger.error("spout fail =" + (Integer)id);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
?運行結果
2014-10-03 21:17:31,149 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =1
2014-10-03 21:17:31,351 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =2
Bolt Thread Thread-22recve : aaa
0 bolt recev:{-3139141336114052337=7131499433188364504}
2014-10-03 21:17:31,552 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =3
Bolt Thread Thread-22recve : aaaa
0 bolt recev:{-4497680640148241887=-615828348570847097}
2014-10-03 21:17:31,754 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =4
Bolt Thread Thread-22recve : aaaaa
0 bolt recev:{-8878772617767839827=-7708082520013359311}
2014-10-03 21:17:31,957 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =5
Bolt Thread Thread-22recve : aaaaaa
0 bolt recev:{-3995020874692495577=-5070846720162762196}
2014-10-03 21:17:32,160 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =6
Bolt Thread Thread-22recve : aaaaaaa
0 bolt recev:{-5994700617723404155=-3738685841476816404}
2014-10-03 21:17:32,362 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =7
Bolt Thread Thread-22recve : aaaaaaaa
0 bolt recev:{-2308734827213127682=-5719708045753233056}
2014-10-03 21:17:32,563 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =8
Bolt Thread Thread-22recve : aaaaaaaaa
0 bolt recev:{-3718844156917119468=-6359724009048981605}
2014-10-03 21:17:32,766 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =9
?
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

