本文發(fā)表于本人 博客 。
????前面幾次講了關(guān)于Hadoop的環(huán)境搭建、HDFS操作,今天接著繼續(xù)。本來Hadoop源碼中就有一個例子WordCount,但是今天我們來自己實(shí)現(xiàn)一個加深對這個Mapper、Reducer的理解,如有不對歡迎指正。
????我們先來梳理一下思路,對于自定義Mapper以及Reducer,我們先要覆蓋其map以及reduce函數(shù),然后按照相關(guān)步驟比如設(shè)置輸入文件目錄、輸入文件格式化類、設(shè)置自定義Mapper、分區(qū)、排序、分組、規(guī)約、設(shè)置自定義Reducer等等。這里我們把輸入文件的使用空格分割(也可以用制表符來),下面是自定義Mapper類MyMapper:
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splied = value.toString().split(" "); for (int i = 0; i < splied.length; i++) { String lineWord = splied[i]; context.write(new Text(lineWord), new LongWritable(1)); } } }
這里我選擇的是新的API,相關(guān)庫基本是在org.apache.hadoop.mapreduce下,舊API是在org.apache.hadoop.mapred下,包括一些引用庫也是這樣。自定義MyMapper是泛型繼承Mapper,其中參數(shù) key\value 是Hadoop內(nèi)部類型,它不支持java的基本類型這里我們需要注意下為什么不選擇java的基本類型呢,原因是不需要其它額外是操作,而且本身需要序列化反序列化并提升其性能所以加入了hadoop的類型放棄java的基本類型。關(guān)于hadoop key\value 跟java基本類型相互轉(zhuǎn)換的問題也很簡單,從java基本類型轉(zhuǎn)換至hadoop的 key\value 的話直接new帶參就可以了,從hadoop的key\value類型轉(zhuǎn)換至java的基本類型使用get方法就可以了!如:
LongWritable lw = new LongWritable(1L); long temp = lw.get();
接下來繼續(xù)看自定義Reducer類MyReduce:
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; public class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0L; for(LongWritable value: values) { count += value.get(); } context.write(key, new LongWritable(count)); } }
這個跟上面類似了,再來看看main方法的如何執(zhí)行的!
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import com.sun.org.apache.xpath.internal.axes.HasPositionalPredChecker; public class Test { static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/output/"; static final String INPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/input/test.txt"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, Test.class.getSimpleName()); deleteOutputFile(OUTPUT_DIR); //1設(shè)置輸入目錄 FileInputFormat.setInputPaths(job, INPUT_DIR); //2設(shè)置輸入格式化類 job.setInputFormatClass(TextInputFormat.class); //3設(shè)置自定義Mapper以及鍵值類型 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //4分區(qū) job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //5排序分組 //6設(shè)置在自定義Reduce以及鍵值類型 job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //7設(shè)置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR)); //8提交job job.waitForCompletion(true); } static void deleteOutputFile(String path) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf); if(fs.exists(new Path(path))){ fs.delete(new Path(path)); } } }
執(zhí)行的時候先會輸出上次執(zhí)行過的輸出目錄。然后就按照步驟:
1.設(shè)置輸入文件目錄; 2.輸入文件格式化類; 3.設(shè)置自定義Mapper以及其鍵值類型; 4.分區(qū); 5.排序; 6.分組; 7.規(guī)約; 8.設(shè)置自定義Reducer以及其鍵值類型; 9.設(shè)置輸出目錄; 10.代碼提交至JobTracker。
當(dāng)然這過程中有些是可以省略的比如輸出文件格式化類。從這個例子我們可以得出:既然可以設(shè)置自定義Mapper以及自定義Reducer,那么也應(yīng)該可以設(shè)置自定義的輸入文件格式化類以及分區(qū)、排序、分組、規(guī)約等等,這個以后會有相關(guān)的筆記現(xiàn)在這里只是寫個簡單的例子。我們編寫一個文件如下并把它上傳至hdfs://hadoop-master:9000/mapreduce/input/test.txt:
luoliang me asura asura.com luoliang me
然后執(zhí)行main函數(shù),將會在hdfs://hadoop-master:9000/mapreduce/output/目錄下輸出一個類似part-*的文件,我們可以使用如下命令查看:
hadoop fs -text /output/part-*
此時會輸出:
asura 1 asura.com 1 luoliang 2 me 2
現(xiàn)在文件是輸出了也對比下是正確,但是腦子還是一片空白,不知道其怎么做到的,那么這個就是關(guān)于mapreduce的原理了,下面我也說說大概其原理:從把代碼提交至JobTracker開始,它就會從指定的輸入文件路徑去獲取文件,這里支持多個文件以及二級目錄下的多個文件,這里獲取就是使用的HDFS api來操作了!把所有文件讀取出來之后按照指定的大小進(jìn)行分割I(lǐng)nputSplit,把分割好后的鍵值FileSplit(比如:<0,"luoliang me">,<13,"asura asura.com luoliang">)再轉(zhuǎn)化為RecordReader(比如<"luoliang",1>,<"luoliang",1>),此時全部轉(zhuǎn)換完畢后會每個都調(diào)用map函數(shù),map函數(shù)把數(shù)據(jù)寫入到Mapper.Context中,再會對數(shù)據(jù)進(jìn)行分區(qū)排序分組規(guī)約,最后通過shuffle到達(dá)reduce端,這其中每個map的輸出數(shù)量是等于reduce的輸入數(shù)量。到達(dá)reduce端數(shù)據(jù)已經(jīng)發(fā)生了質(zhì)變了不在是<"luoliang",1>而是類似變成<"luoliang",{1,1}>這樣的鍵值數(shù)據(jù),這是我們需要迭代獲取總數(shù)量并在寫會context中,計算完后輸出到指定的目錄。在這里由于有重復(fù)的單詞所以map函數(shù)的調(diào)用次數(shù)跟reduce函數(shù)調(diào)用次數(shù)是不同的。規(guī)約這個其實(shí)就是自定義reduce,但是這個不是必須有的因為如果是統(tǒng)計關(guān)于類似平均數(shù)的問題,數(shù)據(jù)在map端進(jìn)行規(guī)約了,雖然傳送時間以及處理時間減少性能提升了但是對于最終結(jié)果可能會有影響,所以這個規(guī)約要看具體情況才能使用。至于這個 shuffle 一步還不是怎么了解需要多多再看看。
這次先到這里。堅持記錄點(diǎn)點(diǎn)滴滴!
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
