public
class
WordCountApp {
//
可以指定目錄,目錄下如果有二級(jí)目錄的話,是不會(huì)執(zhí)行的,只會(huì)執(zhí)行一級(jí)目錄.
private
static
final
String INPUT_PATH = "hdfs://hadoop1:9000/abd";
//
輸入路徑
private
static
final
String OUT_PATH = "hdfs://hadoop1:9000/out";
//
輸出路徑,reduce作業(yè)輸出的結(jié)果是一個(gè)目錄
//
_SUCCESS:在linux中,帶下劃線的這些文件一般都是被忽略不去處理的.表示作業(yè)執(zhí)行成功.
//
_logs:產(chǎn)生的日志文件.
//
part-r-00000:產(chǎn)生的是我們的輸出的文件.開(kāi)始以part開(kāi)始.r:reduce輸出的結(jié)果,map輸出的結(jié)果是m,00000是序號(hào)
public
static
void
main(String[] args) {
Configuration conf
=
new
Configuration();
//
配置對(duì)象
try
{
FileSystem fileSystem
= FileSystem.get(
new
URI(OUT_PATH), conf);
fileSystem.delete(
new
Path(OUT_PATH),
true
);
Job job
=
new
Job(conf, WordCountApp.
class
.getSimpleName());
//
jobName:作業(yè)名稱
job.setJarByClass(WordCountApp.
class
);
FileInputFormat.setInputPaths(job, INPUT_PATH);
//
指定數(shù)據(jù)的輸入
job.setMapperClass(MyMapper.
class
);
//
指定自定義map類(lèi)
job.setMapOutputKeyClass(Text.
class
);
//
指定map輸出key的類(lèi)型
job.setMapOutputValueClass(LongWritable.
class
);
//
指定map輸出value的類(lèi)型
job.setReducerClass(MyReducer.
class
);
//
指定自定義Reduce類(lèi)
job.setOutputKeyClass(Text.
class
);
//
設(shè)置Reduce輸出key的類(lèi)型
job.setOutputValueClass(LongWritable.
class
);
//
設(shè)置Reduce輸出的value類(lèi)型
FileOutputFormat.setOutputPath(job,
new
Path(OUT_PATH));
//
Reduce輸出完之后,就會(huì)產(chǎn)生一個(gè)最終的輸出,指定最終輸出的位置
job.waitForCompletion(
true
);
//
提交給jobTracker并等待結(jié)束
}
catch
(Exception e) {
e.printStackTrace();
}
}
/**
* 輸入的key標(biāo)示偏移量:這一行開(kāi)始的字節(jié). 輸入的value:當(dāng)前的行文本的內(nèi)容. MapReduce執(zhí)行過(guò)程:
* 在這里邊,我們的數(shù)據(jù)輸入來(lái)自于原始文件,數(shù)據(jù)輸出寫(xiě)出到hdfs, 中間的一堆都是map輸出產(chǎn)生的臨時(shí)結(jié)果.存放在map運(yùn)行的linux磁盤(pán)上的,
* 當(dāng)經(jīng)過(guò)shuffle時(shí),reduce就會(huì)通過(guò)http把map端的對(duì)應(yīng)數(shù)據(jù)給取過(guò)來(lái).
* mapred-default.xml中mapredcue.jobtracker
* .root.dir,mapred.tmp.dir存儲(chǔ)map產(chǎn)生的結(jié)果. 作業(yè)運(yùn)行時(shí)產(chǎn)生這個(gè)目錄,作業(yè)運(yùn)行完之后它會(huì)刪除目錄.
*/
public
static
class
MyMapper
extends
Mapper
<LongWritable, Text, Text, LongWritable>
{
//
源文件有兩行記錄,解析源文件會(huì)產(chǎn)生兩個(gè)鍵值對(duì).分別是<0,hello you>,<10,hello me>,所以map函數(shù)會(huì)被調(diào)用兩次.
//
在計(jì)算機(jī)存儲(chǔ)的時(shí)候,是一維的結(jié)構(gòu).
@Override
protected
void
map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
//
為什么要把hadoop類(lèi)型轉(zhuǎn)換為java類(lèi)型?
String line =
value.toString();
String[] splited
= line.split("\t"
);
//
使用hashMap寫(xiě)出去的優(yōu)勢(shì):減少鍵值對(duì)出現(xiàn)的個(gè)數(shù).
Map<String, Integer> hashMap =
new
HashMap<String, Integer>
();
for
(String word : splited) {
//
在for循環(huán)體內(nèi),臨時(shí)變量word出現(xiàn)的此時(shí)是常量1
context.write(
new
Text(word),
new
LongWritable(1));
//
把每個(gè)單詞出現(xiàn)的次數(shù)1寫(xiě)出去.
}
}
}
//
map函數(shù)執(zhí)行結(jié)束后,map輸出的<k,v>一共有4個(gè).<hello,1>,<you,1>,<hello,1>,<me,1>
//
map把數(shù)據(jù)處理完之后,就會(huì)進(jìn)入reduce.
//
在進(jìn)入shuffle之前,數(shù)據(jù)需要先進(jìn)行分區(qū).默認(rèn)只有一個(gè)區(qū).
//
對(duì)每個(gè)不同分區(qū)中的數(shù)據(jù)進(jìn)行排序,分組.
//
排序后的結(jié)果:<hello,1>,<hello,1>,<me,1>,<you,1>
//
分組后的結(jié)果(相同key的value放在一個(gè)集合中):<hello,{1,1}>,<me,{1}>,<you,{1}>
//
規(guī)約(可選)
//
map中的數(shù)據(jù)分發(fā)到reduce的過(guò)程稱作shuffle
public
static
class
MyReducer
extends
Reducer
<Text, LongWritable, Text, LongWritable>
{
//
每一組調(diào)用一次reduce函數(shù),一共調(diào)用了三次
@Override
protected
void
reduce(Text key, Iterable<LongWritable>
values,
Context context)
throws
IOException, InterruptedException {
//
count標(biāo)示單詞key在整個(gè)文件出現(xiàn)的次數(shù)
//
分組的數(shù)量與reduce函數(shù)調(diào)用次數(shù)是相等的.
//
reduce函數(shù)調(diào)用次數(shù)與產(chǎn)生的<k,v>的數(shù)量拋開(kāi)業(yè)務(wù),沒(méi)有任何關(guān)系!
long
count = 0L
;
for
(LongWritable times : values) {
count
+=
times.get();
}
context.write(key,
new
LongWritable(count));
}
}
}
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫(xiě)作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元

