欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

Map/Reduce中Join查詢實(shí)現(xiàn)

系統(tǒng) 2106 0

http://www.cnblogs.com/MengYan-LongYou/p/3360613.html

?

在做這個(gè) Join 查詢的時(shí)候,必然涉及數(shù)據(jù),我這里設(shè)計(jì)了 2 張表,分別較 data.txt info.txt ,字段之間以 /t 劃分。

data.txt 內(nèi)容如下:

201001????1003????abc

201002????1005????def

201003????1006????ghi

201004????1003????jkl

201005????1004????mno

201006????1005????pqr

?

info.txt 內(nèi)容如下:

?

1003????kaka

1004????da

1005????jue

1006????zhao

?

期望輸出結(jié)果:

1003????201001????abc????kaka

1003????201004????jkl????kaka

1004????201005????mno????da

1005????201002????def????jue

1005????201006????pqr????jue

1006????201003????ghi????zhao

?

四、 Map 代碼

首先是 map 的代碼,我貼上,然后簡(jiǎn)要說(shuō)說(shuō)

?

public static class Example_Join_01_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {

????????@Override

????????protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

?

????????????// 獲取輸入文件的全路徑和名稱

????????????String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();

?

????????????if (pathName.contains("data.txt")) {

????????????????String values[] = value.toString().split("/t");

????????????????if (values.length < 3) {

????????????????????// data數(shù)據(jù)格式不規(guī)范,字段小于3,拋棄數(shù)據(jù)

????????????????????return;

????????????????} else {

????????????????????// 數(shù)據(jù)格式規(guī)范,區(qū)分標(biāo)識(shí)為1

????????????????????TextPair tp = new TextPair(new Text(values[1]), new Text("1"));

????????????????????context.write(tp, new Text(values[0] + "/t" + values[2]));

????????????????}

????????????}

????????????if (pathName.contains("info.txt")) {

????????????????String values[] = value.toString().split("/t");

????????????????if (values.length < 2) {

????????????????????// data數(shù)據(jù)格式不規(guī)范,字段小于2,拋棄數(shù)據(jù)

????????????????????return;

????????????????} else {

????????????????????// 數(shù)據(jù)格式規(guī)范,區(qū)分標(biāo)識(shí)為0

????????????????????TextPair tp = new TextPair(new Text(values[0]), new Text("0"));

????????????????????context.write(tp, new Text(values[1]));

????????????????}

????????????}

????????}

????}

?

這里需要注意以下部分:

A pathName 是文件在 HDFS 中的全路徑 ( 例如: hdfs://M1:9000/MengYan/join/data/info.txt) ,可以以 endsWith() 的方法來(lái)判斷。

B 、資料表,也就是這里的 info.txt 需要放在前面,也就是標(biāo)識(shí)號(hào)是 0. 否則無(wú)法輸出理想結(jié)果。

C Map 執(zhí)行完成之后,輸出的中間結(jié)果如下:

1003,0????kaka

1004,0????da

1005,0????jue

1006,0????zhao

1003,1????201001????abc

1003,1????201004????jkl

1004,1????201005????mon

1005,1????201002????def

1005,1????201006????pqr

1006,1????201003????ghi

?

五、分區(qū)和分組

1 map 之后的輸出會(huì)進(jìn)行一些分區(qū)的操作,代碼貼出來(lái):

public static class Example_Join_01_Partitioner extends Partitioner<TextPair, Text> {

????????@Override

????????public int getPartition(TextPair key, Text value, int numParititon) {

????????????return Math.abs(key.getFirst().hashCode() * 127) % numParititon;

????????}

????}

分區(qū)我在以前的文檔中寫過(guò),這里不做描述了,就說(shuō)是按照 map 輸出的符合 key 的第一個(gè)字段做分區(qū)關(guān)鍵字。分區(qū)之后,相同 key 會(huì)劃分到一個(gè) reduce 中去處理(如果 reduce 設(shè)置是 1 ,那么就是分區(qū)有多個(gè),但是還是在一個(gè) reduce 中處理。但是結(jié)果會(huì)按照分區(qū)的原則排序)。分區(qū)后結(jié)果大致如下:

?

同一區(qū):

1003,0????kaka

1003,1????201001????abc

1003,1????201004????jkl

?

?

同一區(qū):

1004,0????da

1004,1????201005????mon

?

?

同一區(qū):

1005,0????jue

1005,1????201002????def

1005,1????201006????pqr

?

?

同一區(qū):

1006,0????zhao

1006,1????201003????ghi

?

2 、分組操作,代碼如下

?

public static class Example_Join_01_Comparator extends WritableComparator {

?

????????public Example_Join_01_Comparator() {

????????????super(TextPair.class, true);

????????}

?

????????@SuppressWarnings("unchecked")

????????public int compare(WritableComparable a, WritableComparable b) {

????????????TextPair t1 = (TextPair) a;

????????????TextPair t2 = (TextPair) b;

????????????return t1.getFirst().compareTo(t2.getFirst());

????????}

????}

?

分組操作就是把在相同分區(qū)的數(shù)據(jù)按照指定的規(guī)則進(jìn)行分組的操作,就以上來(lái)看,是按照復(fù)合 key 的第一個(gè)字段做分組原則,達(dá)到忽略復(fù)合 key 的第二個(gè)字段值的目的,從而讓數(shù)據(jù)能夠迭代在一個(gè) reduce 中。輸出后結(jié)果如下:

?

同一組:

1003,0????kaka

1003,0????201001????abc

1003,0????201004????jkl

?

同一組:

1004,0????da

1004,0????201005????mon

?

同一組:

1005,0????jue

1005,0????201002????def

1005,0????201006????pqr

?

同一組:

1006,0????zhao

1006,0????201003????ghi

?

六、 reduce 操作

貼上代碼如下:

public static class Example_Join_01_Reduce extends Reducer<TextPair, Text, Text, Text> {

????????protected void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException,

????????????????InterruptedException {

????????????Text pid = key.getFirst();

????????????String desc = values.iterator().next().toString();

????????????while (values.iterator().hasNext()) {

????????????????context.write(pid, new Text(values.iterator().next().toString() + "/t" + desc));

????????????}

????????}

????}

1 、代碼比較簡(jiǎn)單,首先獲取關(guān)鍵的 ID 值,就是 key 的第一個(gè)字段。

2 、獲取公用的字段,通過(guò)排組織后可以看到,一些共有字段是在第一位,取出來(lái)即可。

3 、遍歷余下的結(jié)果,輸出。

七、其他的支撐代碼

1 、首先是 TextPair 代碼,沒(méi)有什么可以細(xì)說(shuō)的,貼出來(lái):

public class TextPair implements WritableComparable<TextPair> {

????private Text first;

????private Text second;

?

????public TextPair() {

????????set(new Text(), new Text());

????}

?

????public TextPair(String first, String second) {

????????set(new Text(first), new Text(second));

????}

?

????public TextPair(Text first, Text second) {

????????set(first, second);

????}

?

????public void set(Text first, Text second) {

????????this.first = first;

????????this.second = second;

????}

?

????public Text getFirst() {

????????return first;

????}

?

????public Text getSecond() {

????????return second;

????}

?

????public void write(DataOutput out) throws IOException {

????????first.write(out);

????????second.write(out);

????}

?

????public void readFields(DataInput in) throws IOException {

????????first.readFields(in);

????????second.readFields(in);

????}

?

????public int compareTo(TextPair tp) {

????????int cmp = first.compareTo(tp.first);

????????if (cmp != 0) {

????????????return cmp;

????????}

????????return second.compareTo(tp.second);

????}

}

2 Job 的入口函數(shù)

public static void main(String agrs[]) throws IOException, InterruptedException, ClassNotFoundException {

????????Configuration conf = new Configuration();

????????GenericOptionsParser parser = new GenericOptionsParser(conf, agrs);

????????String[] otherArgs = parser.getRemainingArgs();

????????if (agrs.length < 3) {

????????????System.err.println("Usage: Example_Join_01 <in_path_one> <in_path_two> <output>");

????????????System.exit(2);

????????}

?

????????//conf.set("hadoop.job.ugi", "root,hadoop");

?

????????Job job = new Job(conf, "Example_Join_01");

????????// 設(shè)置運(yùn)行的job

????????job.setJarByClass(Example_Join_01.class);

????????// 設(shè)置Map相關(guān)內(nèi)容

????????job.setMapperClass(Example_Join_01_Mapper.class);

????????// 設(shè)置Map的輸出

????????job.setMapOutputKeyClass(TextPair.class);

????????job.setMapOutputValueClass(Text.class);

????????// 設(shè)置partition

????????job.setPartitionerClass(Example_Join_01_Partitioner.class);

????????// 在分區(qū)之后按照指定的條件分組

????????job.setGroupingComparatorClass(Example_Join_01_Comparator.class);

????????// 設(shè)置reduce

????????job.setReducerClass(Example_Join_01_Reduce.class);

????????// 設(shè)置reduce的輸出

????????job.setOutputKeyClass(Text.class);

????????job.setOutputValueClass(Text.class);

????????// 設(shè)置輸入和輸出的目錄

????????FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

????????FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

????????FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

????????// 執(zhí)行,直到結(jié)束就退出

????????System.exit(job.waitForCompletion(true) ? 0 : 1);

?

????}

?

八、總結(jié)

1 、這是個(gè)簡(jiǎn)單的 join 查詢,可以看到,我在處理輸入源的時(shí)候是在 map 端做來(lái)源判斷。其實(shí)在 0.19 可以用 MultipleInputs.addInputPath() 的方法,但是它用了 JobConf 做參數(shù)。這個(gè)方法原理是多個(gè)數(shù)據(jù)源就采用多個(gè) map 來(lái)處理。方法各有優(yōu)劣。

2 、對(duì)于資源表,如果我們采用 0 1 這樣的模式來(lái)區(qū)分,資源表是需要放在前的。例如本例中 info.txt 就是資源表,所以標(biāo)識(shí)位就是 0. 如果寫為 1 的話,可以試下,在分組之后,資源表對(duì)應(yīng)的值放在了迭代器最后一位,無(wú)法追加在最后所有的結(jié)果集合中。

3 、關(guān)于分區(qū),并不是所有的 map 都結(jié)束才開(kāi)始的,一部分?jǐn)?shù)據(jù)完成就會(huì)開(kāi)始執(zhí)行。同樣,分組操作在一個(gè)分區(qū)內(nèi)執(zhí)行,如果分區(qū)完成,分組將會(huì)開(kāi)始執(zhí)行,也不是等所有分區(qū)完成才開(kāi)始做分組的操作。

Map/Reduce中Join查詢實(shí)現(xiàn)


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 日本大学生免费一级一片 | 91在线视频播放 | 黑人插插| 成人免费一区二区三区视频网站 | 欧美一级毛片一 | 国产精品观看在线亚洲人成网 | 一级一片在线播放在线观看 | 99精品大香线蕉线伊人久久久 | 婷婷色中文字幕 | 欧美精品 在线观看 | 日本又黄又粗暴的gif动态图含羞 | 偷拍在线观看视频在线观看地址 | 日韩亚洲人成网站在线播放 | 一级毛片真人免费观看 | 久久福利青草精品免费 | 91视频.com| 亚洲视频在线看 | 久久99精品国产99久久 | 亚洲欧洲精品一区二区 | 久久精品国产第一区二区 | 99热国产精品 | 欧洲精品欧美精品 | xifan在线a精品一区二区视频网站 | 山岸逢花在线观看 | 国产精品丝袜视频 | 天天操操 | 免费福利在线观看 | 黄色日本视频 | av在线二区 | 免费av一区二区三区 | 欧美日韩久久久 | 国产日韩亚洲不卡高清在线观看 | v片在线看 | 久久久久久久国产 | 亚洲视频免费 | 日韩a级片| 亚洲fuli在线观看 | 亚洲精品国产第一综合99久久 | 久久久久亚洲精品 | 美女爽到呻吟久久久久 | 亚洲国产精久久久久久久 |