欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

hive UDAF開發(fā)入門和運行過程詳解

系統(tǒng) 2678 0

介紹

hive的用戶自定義聚合函數(shù)(UDAF)是一個很好的功能,集成了先進的數(shù)據(jù)處理。hive有兩種UDAF:簡單和通用。顧名思義,簡單的UDAF,寫的相當簡單的,但因為使用Java反射導(dǎo)致性能損失,而且有些特性不能使用,如可變長度參數(shù)列表。通用UDAF可以使用??所有功能,但是UDAF就寫的比較復(fù)雜,不直觀。

本文只介紹通用UDAF。

UDAF是需要在hive的sql語句和group by聯(lián)合使用,hive的group by對于每個分組,只能返回一條記錄,這點和mysql不一樣,切記。

?

UDAF開發(fā)概覽

開發(fā)通用UDAF有兩個步驟,第一個是編寫resolver類,第二個是編寫 evaluator 類。 resolver負責(zé)類型檢查,操作符重載。 evaluator真正實現(xiàn)UDAF的邏輯。通常來說,頂層UDAF類繼承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 里面編寫嵌套類 evaluator ?實現(xiàn)UDAF的邏輯。

?本文以Hive的內(nèi)置UDAF sum函數(shù)的源代碼作為示例講解。

?

實現(xiàn)?resolver

resolver通常繼承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 ,但是我們更建議繼承 AbstractGenericUDAFResolver,隔離將來hive接口的變化。

GenericUDAFResolver和GenericUDAFResolver2接口的區(qū)別是,后面的允許evaluator實現(xiàn)可以訪問更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。

      
        public
      
      
        class
      
       GenericUDAFSum 
      
        extends
      
      
         AbstractGenericUDAFResolver {

  
      
      
        static
      
      
        final
      
       Log LOG = LogFactory.getLog(GenericUDAFSum.
      
        class
      
      
        .getName());

  @Override
  
      
      
        public
      
      
         GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    
      
      
        throws
      
      
         SemanticException {

      
      
            //
      
      
         Type-checking goes here!
      
      
            return
      
      
        new
      
      
         GenericUDAFSumLong(); 
        
}
public static class GenericUDAFSumLong extends GenericUDAFEvaluator { // UDAF logic goes here! } }

這個就是 UDAF的代碼骨架,第一行創(chuàng)建LOG對象,用來寫入警告和錯誤到hive的log。 GenericUDAFResolver只需要重寫一個方法: getEvaluator, 它根據(jù)SQL傳入的參數(shù)類型,返回正確的evaluator。這里最主要是實現(xiàn)操作符的重載。

getEvaluator的完整代碼如下:

      
        public
      
      
         GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    
      
      
        throws
      
      
         SemanticException {
    
      
      
        if
      
       (parameters.length != 1
      
        ) {
      
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(parameters.length - 1
      
        ,
          
      
      "Exactly one argument is expected."
      
        );
    }

    
      
      
        if
      
       (parameters[0].getCategory() !=
      
         ObjectInspector.Category.PRIMITIVE) {
      
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(0
      
        ,
          
      
      "Only primitive type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed."
      
        );
    }
    
      
      
        switch
      
       (((PrimitiveTypeInfo) parameters[0
      
        ]).getPrimitiveCategory()) {
    
      
      
        case
      
      
         BYTE:
    
      
      
        case
      
      
         SHORT:
    
      
      
        case
      
      
         INT:
    
      
      
        case
      
      
         LONG:
    
      
      
        case
      
      
         TIMESTAMP:
      
      
      
        return
      
      
        new
      
      
         GenericUDAFSumLong();
    
      
      
        case
      
      
         FLOAT:
    
      
      
        case
      
      
         DOUBLE:
    
      
      
        case
      
      
         STRING:
      
      
      
        return
      
      
        new
      
      
         GenericUDAFSumDouble();
    
      
      
        case
      
      
         BOOLEAN:
    
      
      
        default
      
      
        :
      
      
      
        throw
      
      
        new
      
       UDFArgumentTypeException(0
      
        ,
          
      
      "Only numeric or string type arguments are accepted but "
          + parameters[0].getTypeName() + " is passed."
      
        );
    }
      
    

這里做了類型檢查,如果不是原生類型(即符合類型,array,map此類),則拋出異常,還實現(xiàn)了操作符重載,對于整數(shù)類型,使用GenericUDAFSumLong實現(xiàn)UDAF的邏輯,對于浮點類型,使用GenericUDAFSumDouble實現(xiàn)UDAF的邏輯。

?

實現(xiàn)evaluator

所有 evaluators必須繼承抽象類org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子類必須實現(xiàn)它的一些抽象方法,實現(xiàn)UDAF的邏輯。

GenericUDAFEvaluator有一個嵌套類Mode,這個類很重要,它表示了udaf在mapreduce的各個階段,理解Mode的含義,就可以理解了hive的UDAF的運行流程。

      
        public
      
      
        static
      
      
        enum
      
      
         Mode {
    
      
      
        /**
      
      
        
     * PARTIAL1: 這個是mapreduce的map階段:從原始數(shù)據(jù)到部分數(shù)據(jù)聚合
     * 將會調(diào)用iterate()和terminatePartial()
     
      
      
        */
      
      
        
    PARTIAL1,
        
      
      
        /**
      
      
        
     * PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責(zé)在map端合并map的數(shù)據(jù)::從部分數(shù)據(jù)聚合到部分數(shù)據(jù)聚合:
     * 將會調(diào)用merge() 和 terminatePartial() 
     
      
      
        */
      
      
        
    PARTIAL2,
        
      
      
        /**
      
      
        
     * FINAL: mapreduce的reduce階段:從部分數(shù)據(jù)的聚合到完全聚合 
     * 將會調(diào)用merge()和terminate()
     
      
      
        */
      
      
        
    FINAL,
        
      
      
        /**
      
      
        
     * COMPLETE: 如果出現(xiàn)了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結(jié)果了:從原始數(shù)據(jù)直接到完全聚合
      * 將會調(diào)用 iterate()和terminate()
     
      
      
        */
      
      
        
    COMPLETE
  };
      
    

一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經(jīng)歷PARTIAL1(mapper),F(xiàn)INAL(reducer),如果還有combiner,那就會經(jīng)歷PARTIAL1(mapper),PARTIAL2(combiner),F(xiàn)INAL(reducer)。

而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數(shù)據(jù),出結(jié)果。

下面以GenericUDAFSumLong的evaluator實現(xiàn)講解

      
        public
      
      
        static
      
      
        class
      
       GenericUDAFSumLong 
      
        extends
      
      
         GenericUDAFEvaluator {


      
      
        private
      
      
         PrimitiveObjectInspector inputOI;
    
      
      
        private
      
      
         LongWritable result;

   
      
      
        //
      
      
        這個方法返回了UDAF的返回類型,這里確定了sum自定義函數(shù)的返回類型是Long類型
      
      
            @Override
    
      
      
        public
      
       ObjectInspector init(Mode m, ObjectInspector[] parameters) 
      
        throws
      
      
         HiveException {
      
      
      
        assert
      
       (parameters.length == 1
      
        );
      
      
      
        super
      
      
        .init(m, parameters);
      result 
      
      = 
      
        new
      
       LongWritable(0
      
        );
      inputOI 
      
      = (PrimitiveObjectInspector) parameters[0
      
        ];
      
      
      
        return
      
      
         PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }

    
      
      
        /**
      
      
         存儲sum的值的類 
      
      
        */
      
      
        static
      
      
        class
      
       SumLongAgg 
      
        implements
      
      
         AggregationBuffer {
      
      
      
        boolean
      
      
         empty;
      
      
      
        long
      
      
         sum;
    }

    
      
      
        //
      
      
        創(chuàng)建新的聚合計算的需要的內(nèi)存,用來存儲mapper,combiner,reducer運算過程中的相加總和。
      
      
        
    @Override
    
      
      
        public
      
       AggregationBuffer getNewAggregationBuffer() 
      
        throws
      
      
         HiveException {
      SumLongAgg result 
      
      = 
      
        new
      
      
         SumLongAgg();
      reset(result);
      
      
      
        return
      
      
         result;
    }
    
    
      
      
        //
      
      
        mapreduce支持mapper和reducer的重用,所以為了兼容,也需要做內(nèi)存的重用。
      
      
        
    @Override
    
      
      
        public
      
      
        void
      
       reset(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
      SumLongAgg myagg 
      
      =
      
         (SumLongAgg) agg;
      myagg.empty 
      
      = 
      
        true
      
      
        ;
      myagg.sum 
      
      = 0
      
        ;
    }

    
      
      
        private
      
      
        boolean
      
       warned = 
      
        false
      
      
        ;
  
    
      
      
        //
      
      
        map階段調(diào)用,只要把保存當前和的對象agg,再加上輸入的參數(shù),就可以了。
      
      
            @Override
    
      
      
        public
      
      
        void
      
       iterate(AggregationBuffer agg, Object[] parameters) 
      
        throws
      
      
         HiveException {
      
      
      
        assert
      
       (parameters.length == 1
      
        );
      
      
      
        try
      
      
         {
        merge(agg, parameters[
      
      0
      
        ]);
      } 
      
      
        catch
      
      
         (NumberFormatException e) {
        
      
      
        if
      
       (!
      
        warned) {
          warned 
      
      = 
      
        true
      
      
        ;
          LOG.warn(getClass().getSimpleName() 
      
      + " "
              +
      
         StringUtils.stringifyException(e));
        }
      }
    }
   
      
      
        //
      
      
        mapper結(jié)束要返回的結(jié)果,還有combiner結(jié)束返回的結(jié)果
      
      
            @Override
    
      
      
        public
      
       Object terminatePartial(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
      
      
      
        return
      
      
         terminate(agg);
    }
    
    
      
      
        //
      
      
        combiner合并map返回的結(jié)果,還有reducer合并mapper或combiner返回的結(jié)果。
      
      
            @Override
    
      
      
        public
      
      
        void
      
       merge(AggregationBuffer agg, Object partial) 
      
        throws
      
      
         HiveException {
      
      
      
        if
      
       (partial != 
      
        null
      
      
        ) {
        SumLongAgg myagg 
      
      =
      
         (SumLongAgg) agg;
        myagg.sum 
      
      +=
      
         PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
        myagg.empty 
      
      = 
      
        false
      
      
        ;
      }
    }
     
    
      
      
        //
      
      
        reducer返回結(jié)果,或者是只有mapper,沒有reducer時,在mapper端返回結(jié)果。
      
      
            @Override
    
      
      
        public
      
       Object terminate(AggregationBuffer agg) 
      
        throws
      
      
         HiveException {
      SumLongAgg myagg 
      
      =
      
         (SumLongAgg) agg;
      
      
      
        if
      
      
         (myagg.empty) {
        
      
      
        return
      
      
        null
      
      
        ;
      }
      result.set(myagg.sum);
      
      
      
        return
      
      
         result;
    }

  }
      
    

除了GenericUDAFSumLong,還有重載的GenericUDAFSumDouble,以上代碼都在hive的源碼:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。

?

注意

terminate()返回的數(shù)據(jù)類型要跟輸入時的數(shù)據(jù)類型保持一致,不然會報錯!

修改方法注冊

修改 ? ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入編寫的 UDAF類,并注冊名字。

FunctionRegistry類包含了hive的所有內(nèi)置自定義函數(shù)。想要更好學(xué)習(xí)hive的UDAF,建議多看看里面的UDAF。

?

總結(jié)

本文的目的是為初學(xué)者入門學(xué)習(xí)udaf,所以介紹了udaf的概覽,尤其是udaf的運行過程,這對初學(xué)者是比較大的檻。

考慮入門,本文簡單介紹了sum的UDAF實現(xiàn),但是如果想要更好理解UDAF的運行過程,建議再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF對hive的運行流程要控制的更加精細,并判斷當前運行的Mode做一定的邏輯處理。

?

參考? https://cwiki.apache.org/Hive/genericudafcasestudy.html

轉(zhuǎn)自? http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

hive UDAF開發(fā)入門和運行過程詳解


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發(fā)表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 午夜免费视频观看 | 国产在线观看www鲁啊鲁免费 | 91在线网站 | 91短视频在线播放 | 天天影视综合网色综合国产 | 日韩特级毛片 | 美女扣下面流白浆丝袜 | 99热久久这里只有精品6国产网 | 一区二区三区国产在线 | 国产亚洲精彩视频 | 在线观看欧美三级 | 国产亚洲精品国产一区 | 国产大片在线观看 | 亚洲精品三级 | 成人国产一区二区三区 | 视频一区二区三区四区五区 | 2021国产精品一区二区在线 | 狠狠五月 | www.欧美在线 | 欧美区在线播放 | 中国大陆高清aⅴ毛片 | www.国产高清 | 精品国产日韩一区三区 | 国产精品久久久久久久久久久久 | 亚洲最黄网站 | 欧美a级片视频 | 亚洲国产天堂久久综合226 | 美剧三体 | 免费观看一级欧美在线视频 | 91日日| 日韩一级视频 | 男女激情啪啪 | 欧美专区在线视频 | 91青青青青青爽在线 | 久久亚洲欧美日本精品品 | 成人黄色短视频在线观看 | 久久99免费视频 | 岛国毛片一级一级特级毛片 | 凤囚凰 电视剧 | av片在线播放 | 婷婷香蕉 |