題記:
在做運營統計的時候,一個最常見的指標是日活躍用戶數(DAU),它的一般性概念為當日所有用戶的去重,但是在大部分情況下,我們獲取到的數據中會有登錄用戶與有匿名用戶,而這部分用戶是會出現重疊的。常規的做法是利用cookie或者imei(移動端)進行自關聯,然后算出有多少用戶同時是登錄用戶和匿名用戶,最終的 日活躍用戶數 = 登錄用戶+匿名用戶-匿名轉登錄用戶。
在實際操作中需要寫復雜的HQL才能完成這部分工作,而且運行效率低下,為此需要開發一個UDAF函數進行處理。
首先說明一下函數的原理:
/* * * 根據flag,uid和imei信息計算個數 * -fla為1 : 將對應的UID存儲在UID集合中,該集合代表登錄用戶 * -flag不為1 : 將對應的imei|wyy存儲在IMEI集合中,該集合代表匿名用戶 * 將imei|wyy存儲一個Map當中,并且判斷該imei|wyy對應的flag是否同時出現過0和1倆個值,如果是則map中對應的value = 2否則為flag * 參數原型: * int itemcount(flag,uid,imei) * 參數說明: * flag: 1或者不為1 * uid: 用戶id * imei: 用戶的第二個參照標識(imei|wyy|cookie) * * 返回值: * int類型,dau值 * * 使用示例: * > SELECT flag, uid, imei FROM test; * 1 uid1 imei1 * 1 uid2 imei1 * 0 uid3 imei3 * * > SELECT daucount(flag,uid,imei) FROM test; * 1 */
其中flag參數可以用其它udf函數進行替換,用以判斷uid是否是登錄用戶。
下面是具體的代碼塊:
package yy.juefan.udaf; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Map.Entry; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @Description ( name = "dau_count" , value = "_FUNC_(flag,uid,imei)" ) public class GenericDauCount extends AbstractGenericUDAFResolver { private static final boolean DEBUG = false ; private static final boolean TRACE = false ; @Override public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException { if (parameters.length != 3 ) { throw new UDFArgumentLengthException( "Exactly 3 argument is expected." ); } if (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() != PrimitiveCategory.INT) { throw new UDFArgumentTypeException(0 , "Only int argument is accepted, but " + parameters[0].getTypeName() + " is passed" ); } if (((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() != PrimitiveCategory.STRING) { throw new UDFArgumentTypeException(1 , "Only string argument is accepted, but " + parameters[1].getTypeName() + " is passed" ); } if (((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory() != PrimitiveCategory.STRING) { throw new UDFArgumentTypeException(2 , "Only string argument is accepted, but " + parameters[2].getTypeName() + " is passed" ); } return new GenericDauCountEvaluator(); } public static class GenericDauCountEvaluator extends GenericUDAFEvaluator { // 封裝接口 StructField uidSetField; StructField imeiSetField; StructField imeiMapField; StructObjectInspector map2red; // for PARTIAL1 and COMPLETE IntObjectInspector flagIO; StringObjectInspector uidIO; StringObjectInspector imeiIO; // for PARTIAL2 and FINAL StandardListObjectInspector uidSetIO; StandardListObjectInspector imeiSetIO; StandardMapObjectInspector imeiMapIO;
private static class DivideAB implements AggregationBuffer { Set <String> uidSet; Set <String> imeiSet; Map <String, Integer> imeiMap; } @Override public AggregationBuffer getNewAggregationBuffer() throws HiveException { DivideAB dab = new DivideAB(); reset(dab); return dab; } @Override public void reset(AggregationBuffer agg) throws HiveException { DivideAB dab = (DivideAB) agg; dab.uidSet = new HashSet<String> (); dab.imeiSet = new HashSet<String> (); dab.imeiMap = new HashMap<String, Integer> (); } boolean warned = false ; @Override public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super .init(m, parameters); // input if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { // for iterate assert (parameters.length == 3 ); flagIO = (IntObjectInspector) parameters[0 ]; uidIO = (StringObjectInspector) parameters[1 ]; imeiIO = (StringObjectInspector) parameters[2 ]; } else { // for merge map2red = (StructObjectInspector) parameters[0 ]; uidSetField = map2red.getStructFieldRef("uidSet" ); imeiSetField = map2red.getStructFieldRef("imeiSet" ); imeiMapField = map2red.getStructFieldRef("imeiMap" ); uidSetIO = (StandardListObjectInspector) uidSetField .getFieldObjectInspector(); imeiSetIO = (StandardListObjectInspector) imeiSetField .getFieldObjectInspector(); imeiMapIO = (StandardMapObjectInspector) imeiMapField .getFieldObjectInspector(); } if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) { ArrayList <ObjectInspector> foi = new ArrayList<ObjectInspector> (); ArrayList <String> fname = new ArrayList<String> (); foi.add(ObjectInspectorFactory .getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector)); foi.add(ObjectInspectorFactory .getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector)); foi.add(ObjectInspectorFactory .getStandardMapObjectInspector( PrimitiveObjectInspectorFactory.javaStringObjectInspector, PrimitiveObjectInspectorFactory.javaIntObjectInspector)); fname.add( "uidSet" ); fname.add( "imeiSet" ); fname.add( "imeiMap" ); return ObjectInspectorFactory.getStandardStructObjectInspector( fname, foi); } else { return PrimitiveObjectInspectorFactory.javaLongObjectInspector; } } @Override public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { if (parameters.length != 3 ) { return ; } DivideAB dab = (DivideAB) agg; int check = PrimitiveObjectInspectorUtils.getInt(parameters[0 ], flagIO); String uid = PrimitiveObjectInspectorUtils.getString(parameters[1 ], uidIO); String imei = PrimitiveObjectInspectorUtils.getString( parameters[ 2 ], imeiIO); if (check == 1) { // 登錄用戶 dab.uidSet.add(uid); } else { // 匿名用戶 dab.imeiSet.add(imei); } if (dab.imeiMap.containsKey(imei)) { int flag = dab.imeiMap.get(imei); if (flag < 2 && flag != check) { dab.imeiMap.put(imei, 2 ); } } else { dab.imeiMap.put(imei, check); } } @Override public Object terminatePartial(AggregationBuffer agg) throws HiveException { DivideAB myagg = (DivideAB) agg; // 存儲中間結果 Object[] partialResult = new Object[3 ]; partialResult[ 0] = new ArrayList<String> (myagg.uidSet); partialResult[ 1] = new ArrayList<String> (myagg.imeiSet); partialResult[ 2] = new HashMap<String, Integer> (myagg.imeiMap); return partialResult; } @SuppressWarnings( "unchecked" ) @Override public void merge(AggregationBuffer agg, Object partial) throws HiveException { if (partial != null ) { DivideAB dab = (DivideAB) agg; Object uidSet = map2red .getStructFieldData(partial, uidSetField); Object imeiSet = map2red.getStructFieldData(partial, imeiSetField); Object imeiMap = map2red.getStructFieldData(partial, imeiMapField); List <Object> uidlist = (List<Object> ) uidSetIO.getList(uidSet); System.err.println( "uidList = " + uidlist.size()); if (uidlist != null ) { System.err.println( "uidSet = " + dab.uidSet.size()); for (Object obj : uidlist) { dab.uidSet.add(obj.toString()); } } List <Object> imeilist = (List<Object> ) uidSetIO .getList(imeiSet); if (imeilist != null ) { for (Object obj : imeilist) { dab.imeiSet.add(obj.toString()); } } Map <String, Integer> imeimap = (Map<String, Integer> ) imeiMapIO .getMap(imeiMap); for (Entry<?, ?> ele : imeimap.entrySet()) { Object kobj = ele.getKey(); String key = kobj.toString(); Object vobj = ele.getValue(); Object val = vobj.toString(); if (dab.imeiMap.containsKey(key)) { int flag = dab.imeiMap.get(key); if (flag < 2 && flag != Integer.parseInt(val.toString())) { dab.imeiMap.put(key, 2 ); } } else { dab.imeiMap.put(key, Integer.parseInt(val.toString())); } } } } @Override public Object terminate(AggregationBuffer agg) throws HiveException { DivideAB dab = (DivideAB) agg; int mix = 0 ; for ( int val : dab.imeiMap.values()) { if (val == 2 ) { mix ++ ; } } return ( long ) (dab.uidSet.size() + dab.imeiSet.size() - mix); } } }
?
又有工作要忙了,先把代碼放上來,下次再寫分析
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
