一、介紹
Storm的開發語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統計結果:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
180
textfiles.
177
uniquefiles.
7
filesignored.
http:
//
cloc.sourceforge.netv1.55T=1.0s(171.0files/s,46869.0lines/s)
——————————————————————————-
Languagefilesblankcommentcode
——————————————————————————-
Java
125
5010
2414
25661
Lisp
33
732
283
4871
Python
7
742
433
4675
CSS
1
12
45
1837
ruby
2
22
0
104
BourneShell
1
0
0
6
Javascript
2
1
15
6
——————————————————————————-
SUM:
171
6519
3190
37160
——————————————————————————-
Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。
二、Topology和Nimbus
Topology是storm的核心理念,將spout和bolt組織成一個topology,運行在storm集群里,完成實時分析和計算的任務。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個topology任務到Storm集群是通過StormSubmitter.submitTopology方法提交:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> StormSubmitter.submitTopology(name,conf,builder.createTopology());
我們將topology打成jar包后,利用bin/storm這個python腳本,執行如下命令:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> bin / stormjarxxxx.jarcom.taobao.MyTopologyargs
將jar包提交給storm集群。storm腳本會啟動JVM執行Topology的main方法,執行submitTopology的過程。而submitTopology會將jar 文件上傳 到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法里可以看到:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
def
jar(jarfile,klass,
*
args):
exec_storm_class(
klass,
jvmtype
=
“
-client
“
,
extrajars
=
[jarfile,CONF_DIR,STORM_DIR
+
"
/bin
"
],
args
=
args,
prefix
=
“
exportSTORM_JAR=
“
+
jarfile
+
“
;
“
)
將jar文件的地址設置為環境變量STORM_JAR,這個環境變量在執行submitTopology的時候用到:
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
//
StormSubmitter.java
private
static
void
submitJar(Mapconf){
if
(submittedJar
==
null
){
LOG.info(
“
Jarnotuploadedtomasteryet.Submittingjar
“
);
StringlocalJar
=
System.getenv(
“
STORM_JAR
“
)
;
submittedJar
=
submitJar(conf,localJar);
}
else
{
LOG.info(
“
Jaralreadyuploadedtomaster.Notsubmittingjar.
“
);
}
}
通過環境變量找到jar包的地址,然后上傳。利用環境變量傳參是個小技巧。
其次,nimbus在接收到jar文件后,存放到數據目錄的inbox目錄, nimbus數據目錄的結構 :
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
-
nimbus
-
inbox
-
stormjar
-
57f1d694
-
2865
-
4b3b
-
8a7c
-
99104fc0aea3.jar
-
stormjar
-
76b4e316
-
b430
-
4215
-
9e26
-
4f33ba4ee520.jar
-
stormdist
-
storm
-
id
-
stormjar.jar
-
stormconf.ser
-
stormcode.ser
其中inbox用于存放提交的jar文件,每個jar文件都重命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規則是“name-計數-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。
這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執行這些代碼。
進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
1.在zookeeper上創建/taskheartbeats/{storm id} 路徑,用于任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節點做存活檢測。task將定時刷新節點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設置。
2.從topology中獲取bolts,spouts設置的并行數目以及全局配置的最大并行數,然后產生task id列表,如[1 2 3 4]
3.在zookeeper上創建/tasks/{strom id}/{task id}路徑,并存儲task信息
4.開始分配任務(內部稱為assignment), 具體步驟:
(1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
(2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
(3)將任務均勻地分配給可用的worker,這里有兩種情況:
(a)task數目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–>
{
1
:[host1:port1]
2
:[host2:port1]
3
:[host1:port1]
4
:[host2:port1]}
,可以看到任務平均地分配在兩個worker上。
(b)如果task數目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,
將不同host間隔排列
,保證task不會全部分配到同一個worker上,也就是將worker排列成
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> [host1:port1host2:port1host1:port2host2:port2]
,然后分配任務為
Code highlighting produced by Actipro CodeHighlighter (freeware)
http://www.CodeHighlighter.com/
–> { 1 :host1:port1, 2 :host2:port2}
(4)記錄啟動時間
(5)判斷現有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的數據里的active設置為true。
6.nimbus會檢查task的心跳,如果發現task心跳超過超時時間,那么會重新跳到第4步做re-assignment。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
