?最近,我加入了Cloudera,在這之前,我在計算生物學/基因組學上已經工作了差不多10年。我的分析工作主要是利用Python語言和它很棒的科學計算棧來進行的。但Apache Hadoop的生態系統大部分都是用Java來實現的,也是為Java準備的,這讓我很惱火。所以,我的頭等大事變成了尋找一些Python可以用的Hadoop框架。
在這篇文章里,我會把我個人對這些框架的一些無關科學的看法寫下來,這些框架包括:
- ??? Hadoop流
- ??? mrjob
- ??? dumbo
- ??? hadoopy
- ??? pydoop
- ??? 其它
?
最終,在我的看來,Hadoop的數據流(streaming)是最快也是最透明的選項,而且最適合于文本處理。mrjob最適合于在Amazon EMR上快速工作,但是會有顯著的性能損失。dumbo 對于大多數復雜的工作都很方便(對象作為鍵名(key)),但是仍然比數據流(streaming)要慢。
請繼續往下閱讀,以了解實現細節,性能以及功能的比較。
一個有趣的問題
為了測試不同的框架,我們不會做“統計詞數”的實驗,轉而去轉化谷歌圖書N-元數據。 N-元代表一個n個詞構成的元組。這個n-元數據集提供了谷歌圖書文集中以年份分組的所有1-,2-,3-,4-,5-元記錄的統計數目。 在這個n-元數據集中的每行記錄都由三個域構成:n-元,年份,觀測次數。(您能夠在http://books.google.com/ngrams取得數據)。
我們希望去匯總數據以觀測統計任何一對相互臨近的詞組合所出現的次數,并以年份分組。實驗結果將使我們能夠判斷出是否有詞組合在某一年中比正常情況出現的更為頻繁。如果統計時,有兩個詞在四個詞的距離內出現過,那么我們定義兩個詞是“臨近”的。 或等價地,如果兩個詞在2-,3-或者5-元記錄中出現過,那么我們也定義它們是”臨近“的。 一次,實驗的最終產物會包含一個2-元記錄,年份和統計次數。
?
有一個微妙的地方必須強調。n-元數據集中每個數據的值都是通過整個谷歌圖書語料庫來計算的。從原理上來說,給定一個5-元數據集,我可以通過簡單地聚合正確的n-元來計算出4-元、3-元和2-元數據集。例如,當5-元數據集包含
?
(the, cat, in, the, hat) 1999 20 (the, cat, is, on, youtube) 1999 13 (how, are, you, doing, today) 1986 5000
時,我們可以將它聚合為2-元數據集以得出如下記錄
?
(the, cat) 1999 33 // 也就是, 20 + 13
然而,實際應用中,只有在整個語料庫中出現了40次以上的n元組才會被統計進來。所以,如果某個5元組達不到40次的閾值,那么Google也提供組成這個5元組的2元組數據,這其中有一些或許能夠達到閾值。出于這個原因,我們用相鄰詞的二元數據,隔一個詞的三元組,隔兩個詞的四元組,以此類推。換句話說,與給定二元組相比,三元組多的只是最外層的詞。除了對可能的稀疏n元數據更敏感,只用n元組最外層的詞還有助于避免重復計算。總的來說,我們將在2元、3元、4元和5元數據集上進行計算。
?
MapReduce的偽代碼來實現這個解決方案類似這樣:
?
def map(record): (ngram, year, count) = unpack(record) // 確保word1為字典第一個字 (word1, word2) = sorted(ngram[first], ngram[last]) key = (word1, word2, year) emit(key, count) def reduce(key, values): emit(key, sum(values))
?
硬件
這些MapReduce組件在一個大約20GB的隨機數據子集上執行。完整的數據集涵蓋1500個文件;我們用這個腳本選取一個隨機子集。文件名保持完整,這一點相當重要,因為文件名確定了數據塊的n-元中n的值。
Hadoop集群包含5個使用CentOS 6.2 x64的虛擬節點,每個都有4個CPU,10GB RAM,100GB硬盤容量,并且運行CDH4。集群每次能夠執行20個并行運算,每個組件能夠執行10個減速器。
?
集群上運行的軟件版本如下:
Hadoop:2.0.0-cdh4.1.2 Python:2.6.6 mrjob:0.4-dev dumbo:0.21.36 hadoopy:0.6.0 pydoop:0.7(PyPI)庫中包含最新版本 java:1.6
?
實現
大多數Python框架都封裝了Hadoop Streaming,還有一些封裝了Hadoop Pipes,也有些是基于自己的實現。下面我會分享一些我使用各種Python工具來寫Hadoop jobs的經驗,并會附上一份性能和特點的比較。我比較感興趣的特點是易于上手和運行,我不會去優化某個單獨的軟件的性能。
在處理每一個數據集的時候,都會有一些損壞的記錄。對于每一條記錄,我們要檢查是否有錯并識別錯誤的種類,包括缺少字段以及錯誤的N元大小。對于后一種情況,我們必須知道記錄所在的文件名以便確定該有的N元大小。
所有代碼可以從 GitHub 獲得。
Hadoop Streaming
Hadoop Streaming 提供了使用其他可執行程序來作為Hadoop的mapper或者reduce的方式,包括標準Unix工具和Python腳本。這個程序必須使用規定的語義從標準輸入讀取數據,然后將結果輸出到標準輸出。直接使用Streaming 的一個缺點是當reduce的輸入是按key分組的時候,仍然是一行行迭代的,必須由用戶來辨識key與key之間的界限。
下面是mapper的代碼:
?
#! /usr/bin/env python import os import re import sys # determine value of n in the current block of ngrams by parsing the filename input_file = os.environ['map_input_file'] expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0]) for line in sys.stdin: data = line.split('\t') # perform some error checking if len(data) < 3: continue # unpack data ngram = data[0].split() year = data[1] count = data[2] # more error checking if len(ngram) != expected_tokens: continue # build key and emit pair = sorted([ngram[0], ngram[expected_tokens - 1]]) print >>sys.stdout, "%s\t%s\t%s\t%s" % (pair[0], pair[1], year, count)
?
下面是reducer:
?
#! /usr/bin/env python import sys total = 0 prev_key = False for line in sys.stdin: data = line.split('\t') curr_key = '\t'.join(data[:3]) count = int(data[3]) # found a boundary; emit current sum if prev_key and curr_key != prev_key: print >>sys.stdout, "%s\t%i" % (prev_key, total) prev_key = curr_key total = count # same key; accumulate sum else: prev_key = curr_key total += count # emit last key if prev_key: print >>sys.stdout, "%s\t%i" % (prev_key, total)
Hadoop流(Streaming)默認用一個tab字符分割健(key)和值(value)。因為我們也用tab字符分割了各個域(field),所以我們必須通過傳遞給Hadoop下面三個選項來告訴它我們數據的健(key)由前三個域構成。
?
-jobconf stream.num.map.output.key.fields=3 -jobconf stream.num.reduce.output.key.fields=3
?
要執行Hadoop任務命令
?
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar \ -input /ngrams \ -output /output-streaming \ -mapper mapper.py \ -combiner reducer.py \ -reducer reducer.py \ -jobconf stream.num.map.output.key.fields=3 \ -jobconf stream.num.reduce.output.key.fields=3 \ -jobconf mapred.reduce.tasks=10 \ -file mapper.py \ -file reducer.py
?
注意,mapper.py和reducer.py在命令中出現了兩次,第一次是告訴Hadoop要執行著兩個文件,第二次是告訴Hadoop把這兩個文件分發給集群的所有節點。
Hadoop Streaming 的底層機制很簡單清晰。與此相反,Python以一種不透明的方式執行他們自己的序列化/反序列化,而這要消耗更多的資源。 而且,如果Hadoop軟件已經存在,Streaming就能運行,而不需要再在上面配置其他的軟件。更不用說還能傳遞Unix 命令或者Java類名稱作 mappers/reducers了。
Streaming缺點是必須要手工操作。用戶必須自己決定如何將對象轉化為為成鍵值對(比如JSON 對象)。對于二進制數據的支持也不好。而且如上面說過的,必須在reducer手工監控key的邊界,這很容易出錯。
mrjob
mrjob是一個開放源碼的Python框架,封裝Hadoop的數據流,并積極開發Yelp的。由于Yelp的運作完全在亞馬遜網絡服務,mrjob的整合與EMR是令人難以置信的光滑和容易(使用 boto包)。
mrjob提供了一個Python的API與Hadoop的數據流,并允許用戶使用任何對象作為鍵和映射器。默認情況下,這些對象被序列化為JSON對象的內部,但也有支持pickle的對象。有沒有其他的二進制I / O格式的開箱即用,但有一個機制來實現自定義序列化。
值得注意的是,mrjob似乎發展的非常快,并有很好的文檔。
所有的Python框架,看起來像偽代碼實現:
?
#! /usr/bin/env python import os import re from mrjob.job import MRJob from mrjob.protocol import RawProtocol, ReprProtocol class NgramNeighbors(MRJob): # mrjob allows you to specify input/intermediate/output serialization # default output protocol is JSON; here we set it to text OUTPUT_PROTOCOL = RawProtocol def mapper_init(self): # determine value of n in the current block of ngrams by parsing filename input_file = os.environ['map_input_file'] self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))[0]) def mapper(self, key, line): data = line.split('\t') # error checking if len(data) < 3: return # unpack data ngram = data[0].split() year = data[1] count = int(data[2]) # more error checking if len(ngram) != self.expected_tokens: return # generate key pair = sorted([ngram[0], ngram[self.expected_tokens - 1]]) k = pair + [year] # note that the key is an object (a list in this case) # that mrjob will serialize as JSON text yield (k, count) def combiner(self, key, counts): # the combiner must be separate from the reducer because the input # and output must both be JSON yield (key, sum(counts)) def reducer(self, key, counts): # the final output is encoded as text yield "%s\t%s\t%s" % tuple(key), str(sum(counts)) if __name__ == '__main__': # sets up a runner, based on command line options NgramNeighbors.run()
?
?mrjob只需要安裝在客戶機上,其中在作業的時候提交。下面是要運行的命令:
export HADOOP_HOME="/usr/lib/hadoop-0.20-mapreduce" ./ngrams.py -r hadoop --hadoop-bin /usr/bin/hadoop --jobconf mapred.reduce.tasks=10 -o hdfs:///output-mrjob hdfs:///ngrams
編寫MapReduce的工作是非常直觀和簡單的。然而,有一個重大的內部序列化計劃所產生的成本。最有可能的二進制計劃將需要實現的用戶(例如,為了支持typedbytes)。也有一些內置的實用程序日志文件的解析。最后,mrjob允許用戶寫多步驟的MapReduce的工作流程,在那里從一個MapReduce作業的中間輸出被自動用作輸入到另一個MapReduce工作。
(注:其余的實現都非常相似,除了包具體的實現,他們都能被找到here.。)
dumbo
dumbo 是另外一個使用Hadoop流包裝的框架。dumbo出現的較早,本應該被許多人使用,但由于缺少文檔,造成開發困難。這也是不如mcjob的一點。
dumbo通過typedbytes執行序列化,能允許更簡潔的數據傳輸,也可以更自然的通過指定JavaInputFormat讀取SequenceFiles或者其他格式的文件,比如,dumbo也可以執行Python的egg和Java的JAR文件。
在我的印象中, 我必須要手動安裝dumbo中的每一個節點, 它只有在typedbytes和dumbo以eggs形式創建的時候才能運行。 就像他會因為onMemoryErrors終止一樣,他也會因為使用組合器停止。
運行dumbo任務的代碼是:
dumbo start ngrams.py \ -hadoop /usr \ -hadooplib /usr/lib/hadoop-0.20-mapreduce/contrib/streaming \ -numreducetasks 10 \ -input hdfs:///ngrams \ -output hdfs:///output-dumbo \ -outputformat text \ -inputformat text
?
hadoopy
hadoopy 是另外一個兼容dumbo的Streaming封裝。同樣,它也使用typedbytes序列化數據,并直接把 typedbytes 數據寫到HDFS。
它有一個很棒的調試機制, 在這種機制下它可以直接把消息寫到標準輸出而不會干擾Streaming過程。它和dumbo很相似,但文檔要好得多。文檔中還提供了與 Apache HBase整合的內容。
用hadoopy的時候有兩種發發來啟動jobs:
- ??? launch 需要每個節點都已經安裝了Python/hadoopy ,但是在這之后的負載就小了。
- ??? launch_frozen 不要求節點上已經安裝了Python,它會在運行的時候安裝,但這會帶來15秒左右的額外時間消耗(據說通過某些優化和緩存技巧能夠縮短這個時間)。
必須在Python程序中啟動hadoopy job,它沒有內置的命令行工具。
我寫了一個腳本通過launch_frozen的方式啟動hadoopy
python launch_hadoopy.py
用launch_frozen運行之后,我在每個節點上都安裝了hadoopy然后用launch方法又運行了一遍,性能明顯好得多。
pydoop
與其他框架相比,pydoop 封裝了Hadoop的管道(Pipes),這是Hadoop的C++ API。 正因為此,該項目聲稱他們能夠提供更加豐富的Hadoop和HDFS接口,以及一樣好的性能。我沒有驗證這個。但是,有一個好處是可以用Python實現一個Partitioner,RecordReader以及RecordWriter。所有的輸入輸出都必須是字符串。
最重要的是,我不能成功的從PIP或者源代碼構建pydoop。
其他
- ???happy 是一個用Jython來寫Hadoop job的框架,但是似乎已經掛了
- ???Disco 成熟的,非Hadoop 的 MapReduce.實現,它的核心使用Erlang寫的,提供了Python的API,它由諾基亞開發,不如Hadoop應用廣泛。
- ??? octopy 是一個純Python的MapReduce實現,它只有一個源文件,并不適于“真正的”計算。
- ???Mortar是另一個Python選擇,它不久前才發布,用戶可以通過一個網頁應用提交Apache Pig 或者 Python jobs 處理放置在 Amazon S3上的數據。
- ??? 有一些更高層次的Hadoop生態體系中的接口,像 Apache Hive和Pig。Pig 可以讓用戶用Python來寫自定義的功能,是通過Jython來運行。 Hive 也有一個Python封裝叫做hipy。
- ??? (Added Jan. 7 2013) Luigi 是一個用于管理多步作業流程的Python框架。它與Apache Oozie 有一點相似,但是它內置封裝了Hadoop Streaming(輕量級的封裝)。Luigi有一個非常好的功能是能夠在job出錯的時候拋出Python代碼的錯誤堆棧,而且它的命令行界面也非常棒。它的README文件內容很多,但是卻缺少詳盡的參考文檔。Luigi 由Spotify 開發并在其內部廣泛使用。
本地java
最后,我使用新的Hadoop Java API接口實施了MR任務,編譯完成后,這樣來運行它:
?
hadoop jar /root/ngrams/native/target/NgramsComparison-0.0.1-SNAPSHOT.jar NgramsDriver hdfs:///ngrams hdfs:///output-native
關于計數器的特別說明
在我的MR jobs的最初實現里,我用計數器來跟蹤監控不良記錄。在Streaming里,需要把信息寫到stderr。事實證明這會帶來不容忽視的額外開銷:Streaming job花的時間是原生java job的3.4倍。這個框架同樣有此問題。
將用Java實現的MapReduce job作為性能基準。 Python框架的值是其相對于Java的性能指標的比率。?
?Java明顯最快,,Streaming要多花一半時間,Python框架花的時間更多。從mrjob mapper的profile數據來看,它在序列化/反序列化上花費了大量時間。dumbo和hadoopy在這方面要好一點。如果用了combiner 的話dumbo 還可以更快。
特點比較
大多來自各自軟件包中的文檔以及代碼庫。
結論
Streaming是最快的Python方案,這面面沒有任何魔力。但是在用它來實現reduce邏輯的時候,以及有很多復雜對象的時候要特別小心。
所有的Python框架看起來都像是偽碼,這非常棒。
mrjob更新快,成熟的易用,用它來組織多步MapReduce的工作流很容易,還可以方便地使用復雜對象。它還可以無縫使用EMR。但是它也是執行速度最慢的。
?
還有一些不是很流行的 Python 框架,他們的主要優勢是內置了對于二進制格式的支持,但如果有必要話,這個完全可以由用戶代碼來自己實現。
就目前來看:
- ??? Hadoop Streaming是一般情況下的最佳選擇,只要在使用reducer的時候多加小心,它還是很簡單易用的。
- ??? 從計算開銷方面考慮的話,選擇mrjob,因為它與Amazon EMR結合最好。
- ??? 如果應用比較復雜,包含了復合鍵,要組合多步流程,dumbo 最合適。它比Streaming慢,但是比mrjob快。
如果你在實踐中有自己的認識,或是發現本文有錯誤,請在回復里提出。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
