今天看Data-Intensive Text Processing with MapReduce 這本書的第三章的時(shí)候,里面有寫到在map端優(yōu)化wordcount。
對(duì)數(shù)據(jù)密集型數(shù)據(jù)進(jìn)行分布式處理的時(shí)候,影響數(shù)據(jù)處理速度的非常重要的一個(gè)方面就是map的輸出中間結(jié)果,在傳送到reduce的過程中,很多的中間數(shù)據(jù)需要進(jìn)行交換以及包括一些相應(yīng)的處理,然后再交給相應(yīng)的reduce。其中中間數(shù)據(jù)需要在網(wǎng)絡(luò)中傳輸,另外中間數(shù)據(jù)在發(fā)送到網(wǎng)絡(luò)上之前還要寫到本地磁盤上,因?yàn)榫W(wǎng)絡(luò)帶寬和磁盤I/O是非常耗時(shí)的相比與其他的操作,所以減少中間數(shù)據(jù)的傳輸將會(huì)增加算法的執(zhí)行效率,通過使用combiner函數(shù)或者其他的方式減少key-value對(duì)的個(gè)數(shù)。下面是一個(gè)改進(jìn)的wordcount算法。
基本的思想是:
在map處理的時(shí)候定義一個(gè)關(guān)聯(lián)數(shù)組,然后對(duì)文檔進(jìn)行處理,將<word,次數(shù)>加入到關(guān)聯(lián)數(shù)組中,word存在,則將相應(yīng)的次數(shù)加1,不存在則直接加入到關(guān)聯(lián)數(shù)組中。所有的map任務(wù)結(jié)束后,然后再在run函數(shù)中輸出處理結(jié)果。
偽代碼:
class Mapper
method Map(docid a,doc d)
??????????? H =new AssociativeArray
for all term t 屬于doc? d? do
???????????????????? H{t}=H{t}+1;
???????????????? for all term t 屬于 H do
??????????????? EMIT(term t,count H{t})
class REDUCER
???? method REDUCE(term t,counts[c1,c2,...])
??????????????? sum=0
?????????????? for? all count c 屬于 counts[c1,c2,...]? do
?????????????????? sum+=c
???????????? EMIT(term t,count sum)
代碼如下:
import
java.io.IOException;
import
java.io.InputStream;
import
java.net.URI;
import
java.util.HashMap;
import
java.util.Iterator;
import
java.util.Map;
import
java.util.StringTokenizer;
import
java.util.Map.Entry;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FSDataInputStream;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper.Context;
import
org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
org.apache.hadoop.util.LineReader;
public
class
Mapper
extends
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable> {
int
c;
HashMap<String,IntWritable> map=
new
HashMap<String,IntWritable>();
@Override
protected
void
map(LongWritable key, Text value,
Context context)
throws
IOException, InterruptedException {
String str=value.toString();
StringTokenizer token=
new
StringTokenizer(str);
while
(token.hasMoreTokens()){
String value1=token.nextToken();
if
(map.containsKey(value1)){
//
System.out.println("ni");
int
p=map.get(value1).get()+1;
map.remove(value1);
map.put(value1,
new
IntWritable(p));
}
else
{
//
System.out.println("ni");
map.put(value1,
new
IntWritable(1));
}
}
//
TODO Auto-generated method stub
c++;
System.out.println(c);
}
@Override
protected
void
cleanup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws
IOException, InterruptedException {
//
TODO Auto-generated method stub
System.out.println("cleanup");
super
.cleanup(context);
}
@Override
public
void
run(Context context)
throws
IOException, InterruptedException {
//
TODO Auto-generated method stub
super
.run(context);
System.out.println("run");
Iterator it=map.entrySet().iterator();
while
(it.hasNext()){
//
System.out.println("nihe");
Map.Entry<String, IntWritable> entry=(Map.Entry<String, IntWritable>) it.next();
//
System.out.println("nihe");
context.write(
new
Text(entry.getKey()), entry.getValue());
}
}
@Override
protected
void
setup(org.apache.hadoop.mapreduce.Mapper.Context context)
throws
IOException, InterruptedException {
//
TODO Auto-generated method stub
// System.out.println(context.getInputSplit().toString());
? ? ? ? // System.out.println(context.getJobID());
? // FileSplit input=(FileSplit)context.getInputSplit();
// String path=input.getPath().toString();
// Configuration conf=
new
Configuration();
? // System.out.println(input.getPath().toString());
?? // FileSystem fs=FileSystem.get(URI.create(path), conf);
// FSDataInputStream filein=fs.open(input.getPath());
?? //? LineReader in=
new
LineReader(filein,conf);
// Text line=
new
Text();
//
? int
cd=in.readLine(line);
//?? System.out.println(line);
???? }
?}
import
java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
public
class
Reducer
extends
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected
void
reduce(Text key, Iterable<IntWritable> values,
Context context)
throws
IOException, InterruptedException {
//
TODO Auto-generated method stub
int
sum=0;
for
(IntWritable it:values){
sum+=it.get();
}
context.write(key,
new
IntWritable(sum));
}
}
import
java.io.IOException;
import
java.net.URI;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public
class
Word {
/**
*
@param
args
*
@throws
IOException
*
@throws
ClassNotFoundException
*
@throws
InterruptedException
*/
public
static
void
main(String[] args)
throws
IOException, InterruptedException, ClassNotFoundException {
//
TODO Auto-generated method stub
Job job=
new
Job();
Configuration conf=
new
Configuration();
Path in=
new
Path(args[0]);
Path out=
new
Path(args[1]);
FileSystem fs=FileSystem.get(URI.create(args[1]), conf);
fs.delete(out);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(Mapper.
class
);
job.setMapOutputKeyClass(Text.
class
);
job.setMapOutputValueClass(IntWritable.
class
);
job.waitForCompletion(
false
);
}
}
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元

