介紹
hive的用戶自定義聚合函數(shù)(UDAF)是一個很好的功能,集成了先進的數(shù)據(jù)處理。hive有兩種UDAF:簡單和通用。顧名思義,簡單的UDAF,寫的相當簡單的,但因為使用Java反射導(dǎo)致性能損失,而且有些特性不能使用,如可變長度參數(shù)列表。通用UDAF可以使用??所有功能,但是UDAF就寫的比較復(fù)雜,不直觀。
本文只介紹通用UDAF。
UDAF是需要在hive的sql語句和group by聯(lián)合使用,hive的group by對于每個分組,只能返回一條記錄,這點和mysql不一樣,切記。
?
UDAF開發(fā)概覽
開發(fā)通用UDAF有兩個步驟,第一個是編寫resolver類,第二個是編寫 evaluator 類。 resolver負責(zé)類型檢查,操作符重載。 evaluator真正實現(xiàn)UDAF的邏輯。通常來說,頂層UDAF類繼承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2, 里面編寫嵌套類 evaluator ?實現(xiàn)UDAF的邏輯。
?本文以Hive的內(nèi)置UDAF sum函數(shù)的源代碼作為示例講解。
?
實現(xiàn)?resolver
resolver通常繼承 org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2 ,但是我們更建議繼承 AbstractGenericUDAFResolver,隔離將來hive接口的變化。
GenericUDAFResolver和GenericUDAFResolver2接口的區(qū)別是,后面的允許evaluator實現(xiàn)可以訪問更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)。
public
class
GenericUDAFSum
extends
AbstractGenericUDAFResolver {
static
final
Log LOG = LogFactory.getLog(GenericUDAFSum.
class
.getName());
@Override
public
GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws
SemanticException {
//
Type-checking goes here!
return
new
GenericUDAFSumLong();
}
public
static
class
GenericUDAFSumLong
extends
GenericUDAFEvaluator {
//
UDAF logic goes here!
}
}
這個就是 UDAF的代碼骨架,第一行創(chuàng)建LOG對象,用來寫入警告和錯誤到hive的log。 GenericUDAFResolver只需要重寫一個方法: getEvaluator, 它根據(jù)SQL傳入的參數(shù)類型,返回正確的evaluator。這里最主要是實現(xiàn)操作符的重載。
getEvaluator的完整代碼如下:
public
GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws
SemanticException {
if
(parameters.length != 1
) {
throw
new
UDFArgumentTypeException(parameters.length - 1
,
"Exactly one argument is expected."
);
}
if
(parameters[0].getCategory() !=
ObjectInspector.Category.PRIMITIVE) {
throw
new
UDFArgumentTypeException(0
,
"Only primitive type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed."
);
}
switch
(((PrimitiveTypeInfo) parameters[0
]).getPrimitiveCategory()) {
case
BYTE:
case
SHORT:
case
INT:
case
LONG:
case
TIMESTAMP:
return
new
GenericUDAFSumLong();
case
FLOAT:
case
DOUBLE:
case
STRING:
return
new
GenericUDAFSumDouble();
case
BOOLEAN:
default
:
throw
new
UDFArgumentTypeException(0
,
"Only numeric or string type arguments are accepted but "
+ parameters[0].getTypeName() + " is passed."
);
}
這里做了類型檢查,如果不是原生類型(即符合類型,array,map此類),則拋出異常,還實現(xiàn)了操作符重載,對于整數(shù)類型,使用GenericUDAFSumLong實現(xiàn)UDAF的邏輯,對于浮點類型,使用GenericUDAFSumDouble實現(xiàn)UDAF的邏輯。
?
實現(xiàn)evaluator
所有 evaluators必須繼承抽象類org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。子類必須實現(xiàn)它的一些抽象方法,實現(xiàn)UDAF的邏輯。
GenericUDAFEvaluator有一個嵌套類Mode,這個類很重要,它表示了udaf在mapreduce的各個階段,理解Mode的含義,就可以理解了hive的UDAF的運行流程。
public
static
enum
Mode {
/**
* PARTIAL1: 這個是mapreduce的map階段:從原始數(shù)據(jù)到部分數(shù)據(jù)聚合
* 將會調(diào)用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 這個是mapreduce的map端的Combiner階段,負責(zé)在map端合并map的數(shù)據(jù)::從部分數(shù)據(jù)聚合到部分數(shù)據(jù)聚合:
* 將會調(diào)用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce階段:從部分數(shù)據(jù)的聚合到完全聚合
* 將會調(diào)用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出現(xiàn)了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結(jié)果了:從原始數(shù)據(jù)直接到完全聚合
* 將會調(diào)用 iterate()和terminate()
*/
COMPLETE
};
一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經(jīng)歷PARTIAL1(mapper),F(xiàn)INAL(reducer),如果還有combiner,那就會經(jīng)歷PARTIAL1(mapper),PARTIAL2(combiner),F(xiàn)INAL(reducer)。
而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數(shù)據(jù),出結(jié)果。
下面以GenericUDAFSumLong的evaluator實現(xiàn)講解
public
static
class
GenericUDAFSumLong
extends
GenericUDAFEvaluator {
private
PrimitiveObjectInspector inputOI;
private
LongWritable result;
//
這個方法返回了UDAF的返回類型,這里確定了sum自定義函數(shù)的返回類型是Long類型
@Override
public
ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws
HiveException {
assert
(parameters.length == 1
);
super
.init(m, parameters);
result
=
new
LongWritable(0
);
inputOI
= (PrimitiveObjectInspector) parameters[0
];
return
PrimitiveObjectInspectorFactory.writableLongObjectInspector;
}
/**
存儲sum的值的類
*/
static
class
SumLongAgg
implements
AggregationBuffer {
boolean
empty;
long
sum;
}
//
創(chuàng)建新的聚合計算的需要的內(nèi)存,用來存儲mapper,combiner,reducer運算過程中的相加總和。
@Override
public
AggregationBuffer getNewAggregationBuffer()
throws
HiveException {
SumLongAgg result
=
new
SumLongAgg();
reset(result);
return
result;
}
//
mapreduce支持mapper和reducer的重用,所以為了兼容,也需要做內(nèi)存的重用。
@Override
public
void
reset(AggregationBuffer agg)
throws
HiveException {
SumLongAgg myagg
=
(SumLongAgg) agg;
myagg.empty
=
true
;
myagg.sum
= 0
;
}
private
boolean
warned =
false
;
//
map階段調(diào)用,只要把保存當前和的對象agg,再加上輸入的參數(shù),就可以了。
@Override
public
void
iterate(AggregationBuffer agg, Object[] parameters)
throws
HiveException {
assert
(parameters.length == 1
);
try
{
merge(agg, parameters[
0
]);
}
catch
(NumberFormatException e) {
if
(!
warned) {
warned
=
true
;
LOG.warn(getClass().getSimpleName()
+ " "
+
StringUtils.stringifyException(e));
}
}
}
//
mapper結(jié)束要返回的結(jié)果,還有combiner結(jié)束返回的結(jié)果
@Override
public
Object terminatePartial(AggregationBuffer agg)
throws
HiveException {
return
terminate(agg);
}
//
combiner合并map返回的結(jié)果,還有reducer合并mapper或combiner返回的結(jié)果。
@Override
public
void
merge(AggregationBuffer agg, Object partial)
throws
HiveException {
if
(partial !=
null
) {
SumLongAgg myagg
=
(SumLongAgg) agg;
myagg.sum
+=
PrimitiveObjectInspectorUtils.getLong(partial, inputOI);
myagg.empty
=
false
;
}
}
//
reducer返回結(jié)果,或者是只有mapper,沒有reducer時,在mapper端返回結(jié)果。
@Override
public
Object terminate(AggregationBuffer agg)
throws
HiveException {
SumLongAgg myagg
=
(SumLongAgg) agg;
if
(myagg.empty) {
return
null
;
}
result.set(myagg.sum);
return
result;
}
}
除了GenericUDAFSumLong,還有重載的GenericUDAFSumDouble,以上代碼都在hive的源碼:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum。
?
注意
terminate()返回的數(shù)據(jù)類型要跟輸入時的數(shù)據(jù)類型保持一致,不然會報錯!
修改方法注冊
修改 ? ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java文件,加入編寫的 UDAF類,并注冊名字。
FunctionRegistry類包含了hive的所有內(nèi)置自定義函數(shù)。想要更好學(xué)習(xí)hive的UDAF,建議多看看里面的UDAF。
?
總結(jié)
本文的目的是為初學(xué)者入門學(xué)習(xí)udaf,所以介紹了udaf的概覽,尤其是udaf的運行過程,這對初學(xué)者是比較大的檻。
考慮入門,本文簡單介紹了sum的UDAF實現(xiàn),但是如果想要更好理解UDAF的運行過程,建議再看看avg UDAF:org.apache.hadoop.hive.ql.udf.generic.GenericUDAFAverage。avg UDAF對hive的運行流程要控制的更加精細,并判斷當前運行的Mode做一定的邏輯處理。
?
參考? https://cwiki.apache.org/Hive/genericudafcasestudy.html
轉(zhuǎn)自? http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

