題記:
在做運營統計的時候,一個最常見的指標是日活躍用戶數(DAU),它的一般性概念為當日所有用戶的去重,但是在大部分情況下,我們獲取到的數據中會有登錄用戶與有匿名用戶,而這部分用戶是會出現重疊的。常規的做法是利用cookie或者imei(移動端)進行自關聯,然后算出有多少用戶同時是登錄用戶和匿名用戶,最終的 日活躍用戶數 = 登錄用戶+匿名用戶-匿名轉登錄用戶。
在實際操作中需要寫復雜的HQL才能完成這部分工作,而且運行效率低下,為此需要開發一個UDAF函數進行處理。
首先說明一下函數的原理:
/*
*
* 根據flag,uid和imei信息計算個數
* -fla為1 : 將對應的UID存儲在UID集合中,該集合代表登錄用戶
* -flag不為1 : 將對應的imei|wyy存儲在IMEI集合中,該集合代表匿名用戶
* 將imei|wyy存儲一個Map當中,并且判斷該imei|wyy對應的flag是否同時出現過0和1倆個值,如果是則map中對應的value = 2否則為flag
* 參數原型:
* int itemcount(flag,uid,imei)
* 參數說明:
* flag: 1或者不為1
* uid: 用戶id
* imei: 用戶的第二個參照標識(imei|wyy|cookie)
*
* 返回值:
* int類型,dau值
*
* 使用示例:
* > SELECT flag, uid, imei FROM test;
* 1 uid1 imei1
* 1 uid2 imei1
* 0 uid3 imei3
*
* > SELECT daucount(flag,uid,imei) FROM test;
* 1
*/
其中flag參數可以用其它udf函數進行替換,用以判斷uid是否是登錄用戶。
下面是具體的代碼塊:
package
yy.juefan.udaf;
import
java.util.ArrayList;
import
java.util.HashMap;
import
java.util.HashSet;
import
java.util.List;
import
java.util.Map;
import
java.util.Set;
import
java.util.Map.Entry;
import
org.apache.hadoop.hive.ql.exec.Description;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import
org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import
org.apache.hadoop.hive.ql.metadata.HiveException;
import
org.apache.hadoop.hive.ql.parse.SemanticException;
import
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import
org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.StructField;
import
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import
org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import
org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@Description (
name
= "dau_count"
,
value
= "_FUNC_(flag,uid,imei)"
)
public
class
GenericDauCount
extends
AbstractGenericUDAFResolver {
private
static
final
boolean
DEBUG =
false
;
private
static
final
boolean
TRACE =
false
;
@Override
public
GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
throws
SemanticException {
if
(parameters.length != 3
) {
throw
new
UDFArgumentLengthException(
"Exactly 3 argument is expected."
);
}
if
(((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() !=
PrimitiveCategory.INT) {
throw
new
UDFArgumentTypeException(0
,
"Only int argument is accepted, but "
+ parameters[0].getTypeName() + " is passed"
);
}
if
(((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() !=
PrimitiveCategory.STRING) {
throw
new
UDFArgumentTypeException(1
,
"Only string argument is accepted, but "
+ parameters[1].getTypeName() + " is passed"
);
}
if
(((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory() !=
PrimitiveCategory.STRING) {
throw
new
UDFArgumentTypeException(2
,
"Only string argument is accepted, but "
+ parameters[2].getTypeName() + " is passed"
);
}
return
new
GenericDauCountEvaluator();
}
public
static
class
GenericDauCountEvaluator
extends
GenericUDAFEvaluator {
//
封裝接口
StructField uidSetField;
StructField imeiSetField;
StructField imeiMapField;
StructObjectInspector map2red;
//
for PARTIAL1 and COMPLETE
IntObjectInspector flagIO;
StringObjectInspector uidIO;
StringObjectInspector imeiIO;
//
for PARTIAL2 and FINAL
StandardListObjectInspector uidSetIO;
StandardListObjectInspector imeiSetIO;
StandardMapObjectInspector imeiMapIO;
private
static
class
DivideAB
implements
AggregationBuffer {
Set
<String>
uidSet;
Set
<String>
imeiSet;
Map
<String, Integer>
imeiMap;
}
@Override
public
AggregationBuffer getNewAggregationBuffer()
throws
HiveException {
DivideAB dab
=
new
DivideAB();
reset(dab);
return
dab;
}
@Override
public
void
reset(AggregationBuffer agg)
throws
HiveException {
DivideAB dab
=
(DivideAB) agg;
dab.uidSet
=
new
HashSet<String>
();
dab.imeiSet
=
new
HashSet<String>
();
dab.imeiMap
=
new
HashMap<String, Integer>
();
}
boolean
warned =
false
;
@Override
public
ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws
HiveException {
super
.init(m, parameters);
//
input
if
(m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
//
for iterate
assert
(parameters.length == 3
);
flagIO
= (IntObjectInspector) parameters[0
];
uidIO
= (StringObjectInspector) parameters[1
];
imeiIO
= (StringObjectInspector) parameters[2
];
}
else
{
//
for merge
map2red = (StructObjectInspector) parameters[0
];
uidSetField
= map2red.getStructFieldRef("uidSet"
);
imeiSetField
= map2red.getStructFieldRef("imeiSet"
);
imeiMapField
= map2red.getStructFieldRef("imeiMap"
);
uidSetIO
=
(StandardListObjectInspector) uidSetField
.getFieldObjectInspector();
imeiSetIO
=
(StandardListObjectInspector) imeiSetField
.getFieldObjectInspector();
imeiMapIO
=
(StandardMapObjectInspector) imeiMapField
.getFieldObjectInspector();
}
if
(m == Mode.PARTIAL1 || m ==
Mode.PARTIAL2) {
ArrayList
<ObjectInspector> foi =
new
ArrayList<ObjectInspector>
();
ArrayList
<String> fname =
new
ArrayList<String>
();
foi.add(ObjectInspectorFactory
.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
foi.add(ObjectInspectorFactory
.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
foi.add(ObjectInspectorFactory
.getStandardMapObjectInspector(
PrimitiveObjectInspectorFactory.javaStringObjectInspector,
PrimitiveObjectInspectorFactory.javaIntObjectInspector));
fname.add(
"uidSet"
);
fname.add(
"imeiSet"
);
fname.add(
"imeiMap"
);
return
ObjectInspectorFactory.getStandardStructObjectInspector(
fname, foi);
}
else
{
return
PrimitiveObjectInspectorFactory.javaLongObjectInspector;
}
}
@Override
public
void
iterate(AggregationBuffer agg, Object[] parameters)
throws
HiveException {
if
(parameters.length != 3
) {
return
;
}
DivideAB dab
=
(DivideAB) agg;
int
check = PrimitiveObjectInspectorUtils.getInt(parameters[0
],
flagIO);
String uid
= PrimitiveObjectInspectorUtils.getString(parameters[1
],
uidIO);
String imei
=
PrimitiveObjectInspectorUtils.getString(
parameters[
2
], imeiIO);
if
(check == 1) {
//
登錄用戶
dab.uidSet.add(uid);
}
else
{
//
匿名用戶
dab.imeiSet.add(imei);
}
if
(dab.imeiMap.containsKey(imei)) {
int
flag =
dab.imeiMap.get(imei);
if
(flag < 2 && flag !=
check) {
dab.imeiMap.put(imei,
2
);
}
}
else
{
dab.imeiMap.put(imei, check);
}
}
@Override
public
Object terminatePartial(AggregationBuffer agg)
throws
HiveException {
DivideAB myagg
=
(DivideAB) agg;
//
存儲中間結果
Object[] partialResult =
new
Object[3
];
partialResult[
0] =
new
ArrayList<String>
(myagg.uidSet);
partialResult[
1] =
new
ArrayList<String>
(myagg.imeiSet);
partialResult[
2] =
new
HashMap<String, Integer>
(myagg.imeiMap);
return
partialResult;
}
@SuppressWarnings(
"unchecked"
)
@Override
public
void
merge(AggregationBuffer agg, Object partial)
throws
HiveException {
if
(partial !=
null
) {
DivideAB dab
=
(DivideAB) agg;
Object uidSet
=
map2red
.getStructFieldData(partial, uidSetField);
Object imeiSet
=
map2red.getStructFieldData(partial,
imeiSetField);
Object imeiMap
=
map2red.getStructFieldData(partial,
imeiMapField);
List
<Object> uidlist = (List<Object>
) uidSetIO.getList(uidSet);
System.err.println(
"uidList = " +
uidlist.size());
if
(uidlist !=
null
) {
System.err.println(
"uidSet = " +
dab.uidSet.size());
for
(Object obj : uidlist) {
dab.uidSet.add(obj.toString());
}
}
List
<Object> imeilist = (List<Object>
) uidSetIO
.getList(imeiSet);
if
(imeilist !=
null
) {
for
(Object obj : imeilist) {
dab.imeiSet.add(obj.toString());
}
}
Map
<String, Integer> imeimap = (Map<String, Integer>
) imeiMapIO
.getMap(imeiMap);
for
(Entry<?, ?>
ele : imeimap.entrySet()) {
Object kobj
=
ele.getKey();
String key
=
kobj.toString();
Object vobj
=
ele.getValue();
Object val
=
vobj.toString();
if
(dab.imeiMap.containsKey(key)) {
int
flag =
dab.imeiMap.get(key);
if
(flag < 2
&& flag !=
Integer.parseInt(val.toString())) {
dab.imeiMap.put(key,
2
);
}
}
else
{
dab.imeiMap.put(key, Integer.parseInt(val.toString()));
}
}
}
}
@Override
public
Object terminate(AggregationBuffer agg)
throws
HiveException {
DivideAB dab
=
(DivideAB) agg;
int
mix = 0
;
for
(
int
val : dab.imeiMap.values()) {
if
(val == 2
) {
mix
++
;
}
}
return
(
long
) (dab.uidSet.size() + dab.imeiSet.size() -
mix);
}
}
}
?
又有工作要忙了,先把代碼放上來,下次再寫分析
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

