前言:
無論什么樣的并行計(jì)算方式,其終極目的都是為了有效利用多機(jī)多核的計(jì)算能力,并能靈活滿足各種需求。相對于傳統(tǒng)基于單機(jī)編寫的運(yùn)行程序,如果使用該方式改寫為多機(jī)并行程序,能夠充分利用多機(jī)多核cpu的資源,使得運(yùn)行效率得到大幅度提升,那么這是一個(gè)好的靠譜的并行計(jì)算方式,反之,又難使用又難直接看出并行計(jì)算優(yōu)勢,還要耗費(fèi)大量學(xué)習(xí)成本,那就不是一個(gè)好的方式。
由于并行計(jì)算在互聯(lián)網(wǎng)應(yīng)用的業(yè)務(wù)場景都比較復(fù)雜,如海量數(shù)據(jù)商品搜索、廣告點(diǎn)擊算法、用戶行為挖掘,關(guān)聯(lián)推薦模型等等,如果以真實(shí)場景舉例,初學(xué)者很容易被業(yè)務(wù)本身的復(fù)雜度繞暈了頭。因此,我們需要一個(gè)通俗易懂的例子來直接看到并行計(jì)算的優(yōu)勢。
數(shù)字排列組合是個(gè)經(jīng)典的算法問題,它很通俗易懂,適合不懂業(yè)務(wù)的人學(xué)習(xí),我們通過它來發(fā)現(xiàn)和運(yùn)用并行計(jì)算的優(yōu)勢,可以得到一個(gè)很直觀的體會,并留下深刻的印象。問題如下:
請寫一個(gè)程序,輸入M,然后打印出M個(gè)數(shù)字的所有排列組合(每個(gè)數(shù)字為1,2,3,4中的一個(gè))。比如:M=3,輸出:
1,1,1
1,1,2
……
4,4,4
共64個(gè)
注意:這里是使用計(jì)算機(jī)遍歷出所有排列組合,而不是求總數(shù),如果只求總數(shù),可以直接利用數(shù)學(xué)公式進(jìn)行計(jì)算了。
一、單機(jī)解決方案:
通常,我們在一臺電腦上寫這樣的排列組合算法,一般用遞歸或者迭代來做,我們先分別看看這兩種方案。
1) 單機(jī)遞歸
可以將n(1<=n<=4)看做深度,輸入的m看做廣度,得到以下遞歸函數(shù)(完整代碼見附件CombTest.java)?
public void comb(String str){ for(int i=1;i<n+1;i++){ if(str.length()==m-1){ System.out.println(str+i); total++; }else comb(str+i); } }
但是當(dāng)m數(shù)字很大時(shí),會超出單臺機(jī)器的計(jì)算局限導(dǎo)致緩慢,太大數(shù)字的排列組合在一臺計(jì)算機(jī)上幾乎很難運(yùn)行出,
不光是排列組合問題,其他類似遍歷求解的遞歸或回溯等算法也都存在這個(gè)問題,如何突破單機(jī)計(jì)算性能的問題一直困擾著我們。
2) 單機(jī)迭代
我們觀察到,求的m個(gè)數(shù)字的排列組合,實(shí)際上都可以在m-1的結(jié)果基礎(chǔ)上得到。
比如m=1,得到排列為1,2,3,4,記錄該結(jié)果為r(1)
m=2, 可以由(1,2,3,4)* r(1) = 11,12,13,14,21,22,…,43,44得到, 記錄該結(jié)果為r(2)
由此,r(m) =(1,2,3,4)*r(m-1)
如果我們從1開始計(jì)算,每輪結(jié)果保存到一個(gè)中間變量中,反復(fù)迭代這個(gè)中間變量,直到算出m的結(jié)果為止,這樣看上去也可行,仿佛還更簡單。
但是如果我們估計(jì)一下這個(gè)中間變量的大小,估計(jì)會嚇一跳,因?yàn)楫?dāng)m=14的時(shí)候,結(jié)果已經(jīng)上億了,一億個(gè)數(shù)字,每個(gè)數(shù)字有14位長,并且為了得到m=15 的結(jié)果,我們需要將m=14的結(jié)果存儲在內(nèi)存變量中用于迭代計(jì)算,無論以什么格式存,幾乎都會遭遇到單臺機(jī)器的內(nèi)存局限,如果排列組合數(shù)字繼續(xù)增大下去,結(jié)果便會內(nèi)存溢出了。
二、分布式并行計(jì)算解決方案:
我們看看如何利用多臺計(jì)算機(jī)來解決該問題,同樣以遞歸和迭代的方式進(jìn)行分析。
1) 多機(jī)遞歸
做分布式并行計(jì)算的核心是需要改變傳統(tǒng)的編程設(shè)計(jì)觀念,將算法重新設(shè)計(jì)按多機(jī)進(jìn)行拆分和合并,有效利用多機(jī)并行計(jì)算優(yōu)勢去完成結(jié)果。
我們觀察到,將一個(gè)n深度m廣度的遞歸結(jié)果記錄為 r(n,m),那么它可以由(1,2,…n)*r(n,m-1)得到:
r(n,m)=1*r(n,m-1)+2*r(n,m-1)+…+n*r(n,m-1)
假設(shè)我們有n臺計(jì)算機(jī),每臺計(jì)算機(jī)的編號依次為1到n,那么每臺計(jì)算機(jī)實(shí)際上只要計(jì)算r(n,m-1)的結(jié)果就夠了,這里實(shí)際上將遞歸降了一級, 并且讓多機(jī)并行計(jì)算。
如果我們有更多的計(jì)算機(jī),假設(shè)有n*n臺計(jì)算機(jī),那么:
r(n,m)=11*r(n,m-2)+12*r(n,m-2)+…+nn*r(n,m-2)
拆分到n*n臺計(jì)算機(jī)上就將遞歸降了兩級了
可以推斷,只要我們的機(jī)器足夠多,能夠線性擴(kuò)充下去,我們的遞歸復(fù)雜度會逐漸降級,并且并行計(jì)算的能力會逐漸增強(qiáng)。
這里是進(jìn)行拆分設(shè)計(jì)的分析是假設(shè)每臺計(jì)算機(jī)只跑1個(gè)實(shí)例,實(shí)際上每臺計(jì)算機(jī)可以跑多個(gè)實(shí)例(如上圖),我們下面的例子可以看到,這種并行計(jì)算的方式相對傳統(tǒng)單機(jī)遞歸有大幅度的效率提升。
這里使用fourinone框架設(shè)計(jì)分布式并行計(jì)算,第一次使用可以參考
分布式計(jì)算上手demo指南
, 開發(fā)包下載地址:
http://www.skycn.com/soft/68321.html
ParkServerDemo:負(fù)責(zé)工人注冊和分布式協(xié)調(diào)
CombCtor:是一個(gè)包工頭實(shí)現(xiàn),它負(fù)責(zé)接收用戶輸入的m,并將m保存到變量comb,和線上工人總數(shù)wknum一起傳給各個(gè)工人,下達(dá)計(jì)算命令,并在計(jì)算完成后累加每個(gè)工人的結(jié)果數(shù)量得到一個(gè)結(jié)果總數(shù)。
CombWorker:是一個(gè)工人實(shí)現(xiàn),它接收到工頭發(fā)的comb和wknum參數(shù)用于遞歸條件,并且通過獲取自己在集群的位置index,做為遞歸初始條件用于降級,它找到一個(gè)排列組合會直接在本機(jī)輸出,但是計(jì)數(shù)保存到total,然后將本機(jī)的total發(fā)給包工頭統(tǒng)計(jì)總體數(shù)量。
運(yùn)行步驟:
為了方便演示,我們在一臺計(jì)算機(jī)上運(yùn)行:
1、啟動(dòng)ParkServerDemo:它的IP端口已經(jīng)在配置文件的PARK部分的SERVERS指定。
2、啟動(dòng)4個(gè)CombWorker實(shí)例:傳入2個(gè)參數(shù),依次是ip或者域名、端口(如果在同一臺機(jī)器可以ip相同,但是端口不同),這里啟動(dòng)4個(gè)工人是由于1<=n<=4,每個(gè)工人實(shí)例剛好可以通過集群位置 index進(jìn)行任務(wù)拆分。
3、運(yùn)行CombCtor查看計(jì)算時(shí)間和結(jié)果
下面是在一臺普通4cpu雙核2.4Ghz內(nèi)存4g開發(fā)機(jī)上和單機(jī)遞歸CombTest的測試對比
通過測試結(jié)果我們可以看到:
1、可以推斷,由于單機(jī)的性能限制,無法完成m值很大的計(jì)算。
2、同是單機(jī)環(huán)境下,并行計(jì)算相對于傳統(tǒng)遞歸提升了將近1.6倍的效率,隨著m的值越大,節(jié)省的時(shí)間越多。
3、單機(jī)遞歸的CPU利用率不高,平均20-30%,在多核時(shí)代沒有充分利用機(jī)器資源,造成cpu閑置浪費(fèi),而并行計(jì)算則能打滿cpu,充分利用機(jī)器資源。
4、如果是多機(jī)分布式并行計(jì)算,在4臺機(jī)器上,采用4*4的16個(gè)實(shí)例完成計(jì)算,效率還會成倍提升,而且機(jī)器數(shù)量越多,計(jì)算越快。
5、單機(jī)遞歸實(shí)現(xiàn)和運(yùn)行簡單,使用c或者java寫個(gè)main函數(shù)完成即可,而分布式并行程序,則需要利用并行框架,以包工頭+多個(gè)工人的全新并行計(jì)算思想去完成。
2) 多機(jī)迭代
我們最后看看如何構(gòu)思多機(jī)分布式迭代方式實(shí)現(xiàn)。
思路一:
根據(jù)單機(jī)迭代的特點(diǎn),我們可以將n臺計(jì)算機(jī)編號為1到n
第一輪統(tǒng)計(jì)各工人發(fā)送編號給工頭,工頭合并得到第一輪結(jié)果{1,2,3,…,n}
第二輪,工頭將第一輪結(jié)果發(fā)給各工人做為計(jì)算輸入條件,各工人根據(jù)自己編號累加,返回結(jié)果給工頭合并,得到第二輪結(jié)果:{11,12,13,1n,…,n1,n2,n3,nn}
這樣迭代下去,直到m輪結(jié)束,如上圖所示。
但很快就會發(fā)現(xiàn),工頭合并每輪結(jié)果是個(gè)很大的瓶頸,很容易內(nèi)存不夠?qū)е掠?jì)算崩潰。
思路二:
如果對思路一改進(jìn),各工人不發(fā)中間結(jié)果給工頭合并,而采取工人之間互相合并方式,將中間結(jié)果按編號分類,通過receive方式(工人互相合并及receive使用可參見
sayhello demo
),將屬于其他工人編號的數(shù)據(jù)發(fā)給對方。這樣一定程度避免了工頭成為瓶頸,但是經(jīng)過實(shí)踐發(fā)現(xiàn),隨著迭代變大,中間結(jié)果數(shù)據(jù)越來越大,工人合并耗用網(wǎng)絡(luò)也越來越大,如果中間結(jié)果保存在各工人內(nèi)存中,隨著m變的更大,仍然存在內(nèi)存溢出危險(xiǎn)。
思路三:
繼續(xù)改進(jìn)思路二,將中間結(jié)果變量不保存內(nèi)存中,而每次寫入文件(
詳見Fourinone2.0對分布式文件的簡化操作
),這樣能避免內(nèi)存問題,但是增加了大量的文件io消耗。雖然能運(yùn)行出結(jié)果,但是并不高效。
總結(jié):
或許分布式迭代在這里并不是最好的做法,上面的多機(jī)遞歸更合適。由于迭代計(jì)算的特點(diǎn),需要將中間結(jié)果進(jìn)行保存,做為下一輪計(jì)算的條件,如果為了利用多機(jī)并行計(jì)算優(yōu)勢,又需要反復(fù)合并產(chǎn)生中間結(jié)果,所以導(dǎo)致對內(nèi)存、帶寬、文件io的耗用很大,處理不當(dāng)容易造成性能低下。
我們早已經(jīng)進(jìn)入多cpu多核時(shí)代,但是我們的傳統(tǒng)程序設(shè)計(jì)和算法還停留在過去單機(jī)應(yīng)用,因此合理利用并行計(jì)算的優(yōu)勢來改進(jìn)傳統(tǒng)軟件設(shè)計(jì)思想,能為我們帶來更大效率的提升。
以下是分布式并行遞歸的demo源碼:
001
|
// CombTest
|
002
|
import
java.util.Date;
|
003
|
public
class
CombTest
|
004
|
{
|
005
|
????
int
m=
0
,n=
0
,total=
0
;
|
006
|
????
CombTest(
int
n,
int
m){
|
007
|
????????
this
.m=m;
|
008
|
????????
this
.n=n;
|
009
|
????
}
|
010
|
????
public
void
comb(String str)
|
011
|
????
{
|
012
|
????????
for
(
int
i=
1
;i<n+
1
;i++){
|
013
|
????????????
if
(str.length()==m-
1
){
|
014
|
????????????????
//System.out.println(str+i);//打印出組合序列
|
015
|
????????????????
total++;
|
016
|
????????????
}
|
017
|
????????????
else
|
018
|
????????????????
comb(str+i);
|
019
|
????????
}
|
020
|
????
}
|
021
|
????
?
|
022
|
????
public
static
void
main(String[] args)
|
023
|
????
{
|
024
|
????????
CombTest ct =
new
CombTest(Integer.parseInt(args[
0
]), Integer.parseInt(args[
1
]));
|
025
|
????????
long
begin = (
new
Date()).getTime();
|
026
|
????????
ct.comb(
""
);
|
027
|
????????
System.out.println(
"total:"
+ct.total);
|
028
|
????????
long
end = (
new
Date()).getTime();
|
029
|
????????
System.out.println(
"time:"
+(end-begin)/
1000
+
"s"
);
|
030
|
????
}
|
031
|
}
|
032
|
? |
033
|
// ParkServerDemo
|
034
|
import
com.fourinone.BeanContext;
|
035
|
public
class
ParkServerDemo{
|
036
|
????
public
static
void
main(String[] args){
|
037
|
????????
BeanContext.startPark();
|
038
|
????
}
|
039
|
}
|
040
|
? |
041
|
// CombCtor
|
042
|
import
com.fourinone.Contractor;
|
043
|
import
com.fourinone.WareHouse;
|
044
|
import
com.fourinone.WorkerLocal;
|
045
|
import
java.util.Date;
|
046
|
public
class
CombCtor
extends
Contractor
|
047
|
{
|
048
|
????
public
WareHouse giveTask(WareHouse wh)
|
049
|
????
{
|
050
|
????????
WorkerLocal[] wks = getWaitingWorkers(
"CombWorker"
);
|
051
|
????????
System.out.println(
"wks.length:"
+wks.length+
";"
+wh);
|
052
|
????????
wh.setObj(
"wknum"
,wks.length);
|
053
|
????????
WareHouse[] hmarr = doTaskBatch(wks, wh);
//批量執(zhí)行任務(wù),所有工人完成才返回
|
054
|
????????
int
total=
0
;
|
055
|
????????
for
(WareHouse hm:hmarr)
|
056
|
????????????
total+=(Integer)hm.getObj(
"total"
);
|
057
|
????????
System.out.println(
"total:"
+total);
|
058
|
????????
return
wh;
|
059
|
????
}
|
060
|
????
?
|
061
|
????
public
static
void
main(String[] args)
|
062
|
????
{
|
063
|
????????
CombCtor a =
new
CombCtor();
|
064
|
????????
WareHouse wh =
new
WareHouse(
"comb"
, Integer.parseInt(args[
0
]));
|
065
|
????????
long
begin = (
new
Date()).getTime();
|
066
|
????????
a.doProject(wh);
|
067
|
????????
long
end = (
new
Date()).getTime();
|
068
|
????????
System.out.println(
"time:"
+(end-begin)/
1000
+
"s"
);
|
069
|
????????
a.exit();
|
070
|
????
}
|
071
|
}
|
072
|
? |
073
|
//CombWorker
|
074
|
import
com.fourinone.MigrantWorker;
|
075
|
import
com.fourinone.WareHouse;
|
076
|
public
class
CombWorker
extends
MigrantWorker
|
077
|
{
|
078
|
????
private
int
m=
0
,n=
0
,total=
0
,index=-
1
;
|
079
|
? |
080
|
????
public
WareHouse doTask(WareHouse wh)
|
081
|
????
{
|
082
|
????????
total=
0
;
|
083
|
????????
n = (Integer)wh.getObj(
"wknum"
);
|
084
|
????????
m = (Integer)wh.getObj(
"comb"
);
|
085
|
????????
index = getSelfIndex()+
1
;
|
086
|
????????
System.out.println(
"index:"
+index);
|
087
|
????????
comb(index+
""
);
|
088
|
????????
System.out.println(
"total:"
+total);
|
089
|
????????
return
new
WareHouse(
"total"
,total);
|
090
|
????
}
|
091
|
????
?
|
092
|
????
public
void
comb(String str)
|
093
|
????
{
|
094
|
????????
for
(
int
i=
1
;i<n+
1
;i++){
|
095
|
????????????
if
(str.length()==m-
1
){
|
096
|
????????????????
//System.out.println(str+i);//打印出組合序列
|
097
|
????????????????
total++;
|
098
|
????????????
}
|
099
|
????????????
else
|
100
|
????????????????
comb(str+i);
|
101
|
????????
}
|
102
|
????
}
|
103
|
????
?
|
104
|
????
public
static
void
main(String[] args)
|
105
|
????
{
|
106
|
????????
CombWorker mw =
new
CombWorker();
|
107
|
????????
mw.waitWorking(args[
0
],Integer.parseInt(args[
1
]),
"CombWorker"
);
|
108
|
????
}
|
109
|
}
|
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
