http://www.cnblogs.com/MengYan-LongYou/p/3360613.html
?
在做這個(gè) Join 查詢的時(shí)候,必然涉及數(shù)據(jù),我這里設(shè)計(jì)了 2 張表,分別較 data.txt 和 info.txt ,字段之間以 /t 劃分。
data.txt 內(nèi)容如下:
201001????1003????abc
201002????1005????def
201003????1006????ghi
201004????1003????jkl
201005????1004????mno
201006????1005????pqr
?
info.txt 內(nèi)容如下:
?
1003????kaka
1004????da
1005????jue
1006????zhao
?
期望輸出結(jié)果:
1003????201001????abc????kaka
1003????201004????jkl????kaka
1004????201005????mno????da
1005????201002????def????jue
1005????201006????pqr????jue
1006????201003????ghi????zhao
?
四、 Map 代碼
首先是 map 的代碼,我貼上,然后簡(jiǎn)要說(shuō)說(shuō)
?
public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
????????@Override
????????protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
?
????????????// 獲取輸入文件的全路徑和名稱
????????????String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
?
????????????if (pathName.contains("data.txt")) {
????????????????String values[] = value.toString().split("/t");
????????????????if (values.length < 3) {
????????????????????// data數(shù)據(jù)格式不規(guī)范,字段小于3,拋棄數(shù)據(jù)
????????????????????return;
????????????????} else {
????????????????????// 數(shù)據(jù)格式規(guī)范,區(qū)分標(biāo)識(shí)為1
????????????????????TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
????????????????????context.write(tp, new Text(values[0] + "/t" + values[2]));
????????????????}
????????????}
????????????if (pathName.contains("info.txt")) {
????????????????String values[] = value.toString().split("/t");
????????????????if (values.length < 2) {
????????????????????// data數(shù)據(jù)格式不規(guī)范,字段小于2,拋棄數(shù)據(jù)
????????????????????return;
????????????????} else {
????????????????????// 數(shù)據(jù)格式規(guī)范,區(qū)分標(biāo)識(shí)為0
????????????????????TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
????????????????????context.write(tp, new Text(values[1]));
????????????????}
????????????}
????????}
????}
?
這里需要注意以下部分:
A 、 pathName 是文件在 HDFS 中的全路徑 ( 例如: hdfs://M1:9000/MengYan/join/data/info.txt) ,可以以 endsWith() 的方法來(lái)判斷。
B 、資料表,也就是這里的 info.txt 需要放在前面,也就是標(biāo)識(shí)號(hào)是 0. 否則無(wú)法輸出理想結(jié)果。
C 、 Map 執(zhí)行完成之后,輸出的中間結(jié)果如下:
1003,0????kaka
1004,0????da
1005,0????jue
1006,0????zhao
1003,1????201001????abc
1003,1????201004????jkl
1004,1????201005????mon
1005,1????201002????def
1005,1????201006????pqr
1006,1????201003????ghi
?
五、分區(qū)和分組
1 、 map 之后的輸出會(huì)進(jìn)行一些分區(qū)的操作,代碼貼出來(lái):
public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {
????????@Override
????????public int getPartition(TextPair key, Text value, int numParititon) {
????????????return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
????????}
????}
分區(qū)我在以前的文檔中寫過(guò),這里不做描述了,就說(shuō)是按照 map 輸出的符合 key 的第一個(gè)字段做分區(qū)關(guān)鍵字。分區(qū)之后,相同 key 會(huì)劃分到一個(gè) reduce 中去處理(如果 reduce 設(shè)置是 1 ,那么就是分區(qū)有多個(gè),但是還是在一個(gè) reduce 中處理。但是結(jié)果會(huì)按照分區(qū)的原則排序)。分區(qū)后結(jié)果大致如下:
?
同一區(qū):
1003,0????kaka
1003,1????201001????abc
1003,1????201004????jkl
?
?
同一區(qū):
1004,0????da
1004,1????201005????mon
?
?
同一區(qū):
1005,0????jue
1005,1????201002????def
1005,1????201006????pqr
?
?
同一區(qū):
1006,0????zhao
1006,1????201003????ghi
?
2 、分組操作,代碼如下
?
public static class Example_Join_01_Comparator extends WritableComparator {
?
????????public Example_Join_01_Comparator() {
????????????super(TextPair.class, true);
????????}
?
????????@SuppressWarnings("unchecked")
????????public int compare(WritableComparable a, WritableComparable b) {
????????????TextPair t1 = (TextPair) a;
????????????TextPair t2 = (TextPair) b;
????????????return t1.getFirst().compareTo(t2.getFirst());
????????}
????}
?
分組操作就是把在相同分區(qū)的數(shù)據(jù)按照指定的規(guī)則進(jìn)行分組的操作,就以上來(lái)看,是按照復(fù)合 key 的第一個(gè)字段做分組原則,達(dá)到忽略復(fù)合 key 的第二個(gè)字段值的目的,從而讓數(shù)據(jù)能夠迭代在一個(gè) reduce 中。輸出后結(jié)果如下:
?
同一組:
1003,0????kaka
1003,0????201001????abc
1003,0????201004????jkl
?
同一組:
1004,0????da
1004,0????201005????mon
?
同一組:
1005,0????jue
1005,0????201002????def
1005,0????201006????pqr
?
同一組:
1006,0????zhao
1006,0????201003????ghi
?
六、 reduce 操作
貼上代碼如下:
public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {
????????protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,
????????????????InterruptedException {
????????????Text pid = key.getFirst();
????????????String desc = values.iterator().next().toString();
????????????while (values.iterator().hasNext()) {
????????????????context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));
????????????}
????????}
????}
1 、代碼比較簡(jiǎn)單,首先獲取關(guān)鍵的 ID 值,就是 key 的第一個(gè)字段。
2 、獲取公用的字段,通過(guò)排組織后可以看到,一些共有字段是在第一位,取出來(lái)即可。
3 、遍歷余下的結(jié)果,輸出。
七、其他的支撐代碼
1 、首先是 TextPair 代碼,沒(méi)有什么可以細(xì)說(shuō)的,貼出來(lái):
public class TextPair implements WritableComparable<TextPair> {
????private Text first;
????private Text second;
?
????public TextPair() {
????????set(new Text(), new Text());
????}
?
????public TextPair(String first, String second) {
????????set(new Text(first), new Text(second));
????}
?
????public TextPair(Text first, Text second) {
????????set(first, second);
????}
?
????public void set(Text first, Text second) {
????????this.first = first;
????????this.second = second;
????}
?
????public Text getFirst() {
????????return first;
????}
?
????public Text getSecond() {
????????return second;
????}
?
????public void write(DataOutput out) throws IOException {
????????first.write(out);
????????second.write(out);
????}
?
????public void readFields(DataInput in) throws IOException {
????????first.readFields(in);
????????second.readFields(in);
????}
?
????public int compareTo(TextPair tp) {
????????int cmp = first.compareTo(tp.first);
????????if (cmp != 0) {
????????????return cmp;
????????}
????????return second.compareTo(tp.second);
????}
}
2 、 Job 的入口函數(shù)
public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {
????????Configuration conf = new Configuration();
????????GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);
????????String[] otherArgs = parser.getRemainingArgs();
????????if (agrs.length < 3) {
????????????System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");
????????????System.exit(2);
????????}
?
????????//conf.set("hadoop.job.ugi", "root,hadoop");
?
????????Job job = new Job(conf, "Example_Join_01");
????????// 設(shè)置運(yùn)行的job
????????job.setJarByClass(Example_Join_01.class);
????????// 設(shè)置Map相關(guān)內(nèi)容
????????job.setMapperClass(Example_Join_01_Mapper.class);
????????// 設(shè)置Map的輸出
????????job.setMapOutputKeyClass(TextPair.class);
????????job.setMapOutputValueClass(Text.class);
????????// 設(shè)置partition
????????job.setPartitionerClass(Example_Join_01_Partitioner.class);
????????// 在分區(qū)之后按照指定的條件分組
????????job.setGroupingComparatorClass(Example_Join_01_Comparator.class);
????????// 設(shè)置reduce
????????job.setReducerClass(Example_Join_01_Reduce.class);
????????// 設(shè)置reduce的輸出
????????job.setOutputKeyClass(Text.class);
????????job.setOutputValueClass(Text.class);
????????// 設(shè)置輸入和輸出的目錄
????????FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
????????FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
????????FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
????????// 執(zhí)行,直到結(jié)束就退出
????????System.exit(job.waitForCompletion(true) ? 0 : 1);
?
????}
?
八、總結(jié)
1 、這是個(gè)簡(jiǎn)單的 join 查詢,可以看到,我在處理輸入源的時(shí)候是在 map 端做來(lái)源判斷。其實(shí)在 0.19 可以用 MultipleInputs.addInputPath() 的方法,但是它用了 JobConf 做參數(shù)。這個(gè)方法原理是多個(gè)數(shù)據(jù)源就采用多個(gè) map 來(lái)處理。方法各有優(yōu)劣。
2 、對(duì)于資源表,如果我們采用 0 和 1 這樣的模式來(lái)區(qū)分,資源表是需要放在前的。例如本例中 info.txt 就是資源表,所以標(biāo)識(shí)位就是 0. 如果寫為 1 的話,可以試下,在分組之后,資源表對(duì)應(yīng)的值放在了迭代器最后一位,無(wú)法追加在最后所有的結(jié)果集合中。
3 、關(guān)于分區(qū),并不是所有的 map 都結(jié)束才開(kāi)始的,一部分?jǐn)?shù)據(jù)完成就會(huì)開(kāi)始執(zhí)行。同樣,分組操作在一個(gè)分區(qū)內(nèi)執(zhí)行,如果分區(qū)完成,分組將會(huì)開(kāi)始執(zhí)行,也不是等所有分區(qū)完成才開(kāi)始做分組的操作。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
