欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

Storm系列(三)Topology提交過(guò)程

系統(tǒng) 1852 0

提交示例代碼:

1 public static void main(String[] args) throws Exception {

2 TopologyBuilder builder = new TopologyBuilder();

3 builder.setSpout("random", new RandomWordSpout(), 2);

4 builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");

5 builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));

6 Config conf = new Config();

7 conf.setNumWorkers(4);// 設(shè)置啟動(dòng)4個(gè)Worker

8 conf.setNumAckers(1); // 設(shè)置一個(gè)ack線程

9 conf.setDebug(true); // 設(shè)置打印所有發(fā)送的消息及系統(tǒng)消息

10 StormSubmitter.submitTopology("test", conf, builder.createTopology());

11 }

1、構(gòu)建 TopologyBuilder 對(duì)象 builder,主要用于對(duì)各個(gè)組件(bolt、spout)進(jìn)行配置;

TopologyBuilder主要屬性字段定義如下:

public class TopologyBuilder {

// 所提交Topolog中所有的bolt將放入到_bolts中

private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();

// 所提交Topolog中所有的spout將放入到_spouts中

private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();

// 所提交Topolog中所有的spout和bolt都將放入_commons中

private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

....................................

}

2、以上提交代碼中第三行,配置了一個(gè)id值為random,IRichSpout對(duì)象為RandomWordSpout,而并行度為2(兩個(gè)線程里面跑兩個(gè)任務(wù))的spout;

// setSpout函數(shù)實(shí)現(xiàn)源碼

public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {

validateUnusedId(id);

initCommon(id, spout, parallelism_hint);

_spouts.put(id, spout);

return new SpoutGetter(id);

}

validateUnusedId:檢測(cè)輸入的id是不是唯一,若已經(jīng)存在將拋出異常;

initCommon:構(gòu)建ComponentCommon對(duì)象并進(jìn)行相應(yīng)的初始化,最后放入到_commons(以上TopologyBuilder中定義的Map);

initCommon函數(shù)實(shí)現(xiàn)源碼:

private void initCommon(String id, IComponent component, Number parallelism) {

ComponentCommon common = new ComponentCommon();

// 設(shè)置消息流的來(lái)源及分組方式

common.set_inputs(new HashMap<GlobalStreamId, Grouping>());

if(parallelism!=null)

// 設(shè)置并行度

common.set_parallelism_hint(parallelism.intValue());

Map conf = component.getComponentConfiguration();

if(conf!=null)

// 設(shè)置組件的配置參數(shù)

common.set_json_conf(JSONValue.toJSONString(conf));

_commons.put(id, common);

}

在ComponentCommon中主要對(duì)以下四個(gè)屬性字段進(jìn)行設(shè)置:

// GlobalStreamId:確定消息來(lái)源,其中componentId表示所屬組件,streamId為消息流的標(biāo)識(shí)符;

// Grouping:確定消息分組方式;

private Map<GlobalStreamId,Grouping> inputs;

// StreamInfo表示輸出的字段列表及是否為直接流

private Map<String,StreamInfo> streams;

private int parallelism_hint; // 設(shè)置并行度

private String json_conf; // 其它配置參數(shù)設(shè)置(必須為JSON格式)

3、SpoutGetter

實(shí)現(xiàn)源碼:

protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {

public SpoutGetter(String id) {

super(id);

}

}

ConfigGetter、SpoutGetter的實(shí)現(xiàn)都是在TopologyBuilder中, ConfigGetter作用:設(shè)置程序中的配置項(xiàng),覆蓋默認(rèn)的配置項(xiàng),且配置項(xiàng)的格式為為JSON(本質(zhì)上是改變對(duì)應(yīng)ComponentCommon對(duì)象中json_conf的值);

4、提交示例代碼中的第四行定義了一個(gè)id為transfer,IRichSpout對(duì)象為TransferBolt,并行度為4的bolt

setBolt實(shí)現(xiàn)源碼:

public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {

validateUnusedId(id);

initCommon(id, bolt, parallelism_hint);

_bolts.put(id, bolt);

return new BoltGetter(id);

}

設(shè)置Bolt的函數(shù)與設(shè)置Spout函數(shù)的實(shí)現(xiàn)唯一的區(qū)別在返回結(jié)果;

BoltGetter實(shí)現(xiàn)部分源碼:

protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {

private String _boltId;

public BoltGetter(String boltId) {

super(boltId);

_boltId = boltId;

}

public BoltDeclarer shuffleGrouping(String componentId) {

return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);

}

public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {

return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);

}

public BoltDeclarer fieldsGrouping(String componentId, String streamId, Fields fields) {

return grouping(componentId, streamId, Grouping.fields(fields.toList()));

}

public BoltDeclarer shuffleGrouping(String componentId, String streamId) {

return grouping(componentId, streamId, Grouping.shuffle(new NullStruct()));

}

private BoltDeclarer grouping(String componentId, String streamId, Grouping grouping) {

_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);

return this;

}

.........................................

}

BoltGetter繼承至ConfigGetter并實(shí)現(xiàn)了BoltDeclarer接口,并重載了BoltDeclarer(InputDeclarer)中各種分組方式(如:fieldsGrouping、shuffleGrouping),分組方式的實(shí)現(xiàn)本質(zhì)上是在_commons中通過(guò)對(duì)用的boltId找到對(duì)應(yīng)的ComponentCommon對(duì)象,對(duì)inputs屬性進(jìn)行設(shè)置;

5、通過(guò)以上幾步完成了bolt與spout的配置(對(duì)應(yīng)提交示例代碼中的2~5行),6~9行是對(duì)運(yùn)行環(huán)境的配置,10行用于向集群提交執(zhí)行任務(wù),builder.createTopology用于構(gòu)建StormTopology對(duì)象.

createTopology實(shí)現(xiàn)源碼:

public StormTopology createTopology() {

Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();

Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();

for(String boltId: _bolts.keySet()) {

IRichBolt bolt = _bolts.get(boltId);

ComponentCommon common = getComponentCommon(boltId, bolt);

boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.serialize(bolt)), common));

}

for(String spoutId: _spouts.keySet()) {

IRichSpout spout = _spouts.get(spoutId);

ComponentCommon common = getComponentCommon(spoutId, spout);

spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.serialize(spout)), common));

}

return new StormTopology(spoutSpecs,

boltSpecs,

new HashMap<String, StateSpoutSpec>());

}

以上源碼實(shí)現(xiàn)中主要做了兩件事:

  • 通過(guò)boltId從_bolts中獲取到對(duì)應(yīng)的bolt對(duì)象,再通過(guò)getComponentCommon方法設(shè)置對(duì)應(yīng)ComponentCommon對(duì)象的streams(輸出的字段列表及是否為直接流)屬性值,最后將bolt和common一起 放入到boltSpecs集合中。
  • 通過(guò)spoutId從_spouts中獲取到對(duì)應(yīng)的spout對(duì)象,再通過(guò)getComponentCommon方法設(shè)置對(duì)應(yīng)ComponentCommon對(duì)象的streams(輸出的字段列表及是否為直接流)屬性值,最后將spout和common一起 放入到boltSpecs集合中。
  • 通過(guò)以上兩步使所設(shè)置的所有組件都封裝到StormTopology對(duì)象中,最后提交的到集群中運(yùn)行。

Storm系列(三)Topology提交過(guò)程


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 欧美日韩国产中文字幕 | 精品福利av导航 | 色秀视频在线观看全部 | 色噜噜狠狠先锋影音久久 | 久久精品国产免费看久久精品 | 日韩一区二区中文字幕 | 高清久久久 | 亚洲视频观看 | 久草免费色站 | 国产精品免费大片一区二区 | 国产牛仔裤系列在线观看 | 国产麻豆一区二区三区 | 久久香蕉国产线看观看网站 | 久久中文字幕美谷朱里 | 欧美电影免费观看 | 成人免费视频网 | 国产精品美女www爽爽爽视频 | 亚洲婷婷国产精品电影人久久 | 亚洲 欧美 激情 小说 另类 | 欧美日韩成人在线观看 | 国产一区二区免费 | 波多野结衣一级 | 瑟瑟视频在线 | 国产精品精品视频一区二区三区 | 久久久久久全国免费观看 | 91精品国产综合久久福利软件 | 亚洲色图日韩 | 亚洲一区二区在线视频 | 亚洲精品第一综合99久久 | xifan在线a精品一区二区视频网站 | 欧美日韩网址 | 色噜噜狠狠色综合欧洲selulu | 免费的污污网站 | 日日日日干 | 欧美视频观看 | 日本高清无卡码一区二区久久 | h5.meihuan.art| 99久久精品免费看国产一区二区 | 免费小视频 | 久久久精品一区 | 国产精品99久久久久 |