作者简介
淳敏,物流架构师同时也是一位team leader,工作认真负责,曾在休假期间“面向大海编程”,不明觉厉
在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能。Hive 自定义函数主要包含以下三种:
Hive的UDF机制是需要用户实现: Resolver 和 Evaluator ,其中 Resolver 就用来处理输入,调用 Evaluator , Evaluator 就是具体功能的实现。
Hadoop提供了一个基础类 org.apache.hadoop.hive.ql.exec.UDF ,在这个类中含有了一个 UDFMethodResolver 的接口实现类 DefaultUDFMethodResolver 的对象。
public class UDF {
private UDFMethodResolver rslv;
public UDF() {
this.rslv = new DefaultUDFMethodResolver(this.getClass());
}
......
}
复制代码
在 DefaultUDFMethodResolver 中,提供了一个 getEvalMethod 的方法,从切面调用 UDF 的 evaluate 方法
public class DefaultUDFMethodResolver implements UDFMethodResolver {
private final Class<? extends UDF> udfClass;
public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
return FunctionRegistry.getMethodInternal(this.udfClass, "evaluate", false, argClasses);
}
}
复制代码
自定义UDF的实现上以继承 org.apache.hadoop.hive.ql.exec.UDF 为基础,然后实现一个 evaluate 方法,该方法会被 DefaultUDFMethodResolver 对象执行。
public class DAIsContainPoint extends UDF {
public Boolean evaluate(Double longitude, Double latitude, String geojson) {
Boolean isContained = false;
try {
Polygon polygon = JTSHelper.parse(geojson);
Coordinate center = new Coordinate(longitude, latitude);
GeometryFactory factory = new GeometryFactory();
Point point = factory.createPoint(center);
isContained = polygon.contains(point);
}catch (Throwable e){
isContained = false;
}finally {
return isContained;
}
}
}
复制代码
完成了代码定义之后需要对其进行打包,编译成一个 jar ,注意: 最终的 jar 中需要包含所有依赖的 jar , maven 编译上推荐使用 maven-shade-plugin
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
复制代码
最后产生的 jar 文件需要在HIVE SQL中被引用
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar; create temporary function is_in_polygon as 'me.ele.breat.hive.udf.DAIsContainPoint'; select lat, lng, geojson, is_in_polygon(lat, lng, geojson) as is_in from example; 复制代码
在Hive的聚合计算中,采用MapReduce的方式来加快聚合的速度,而UDAF就是用来撰写聚合类自定义方法的扩展方式。关于MapReduce需要补充知识的请看这里,为了更好的说明白UDAF我们需要知道一下 MapReduce 的流程
回到Hive中来,在UDAF的实现中,首先需要继承 org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver ,并实现 org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2 接口。然后构造 GenericUDAFEvaluator 类,实现MapReduce的计算过程,其中有3个关键的方法
iterate merge terminate
然后再实现一个继承 AbstractGenericUDAFResolver 的类,重载其 getEvaluator 的方法,返回一个 GenericUDAFEvaluator 的实例
public class DAJoinV2 extends AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo genericUDAFParameterInfo)
throws SemanticException {
return new DAJoinStringEvaluator();
}
public GenericUDAFEvaluator getEvaluator(TypeInfo[] typeInfos) throws SemanticException {
if (typeInfos.length != 1) {
throw new UDFArgumentTypeException(typeInfos.length - 1,
"Exactly one argument is expected.");
}
if (typeInfos[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"Only primitive type arguments are accepted but "
+ typeInfos[0].getTypeName() + " is passed.");
}
switch (((PrimitiveTypeInfo) typeInfos[0]).getPrimitiveCategory()) {
case STRING:
return new DAJoinStringEvaluator();
default:
throw new UDFArgumentTypeException(0,
"Only numeric or string type arguments are accepted but "
+ typeInfos[0].getTypeName() + " is passed.");
}
}
public static class DAJoinStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector mInput;
private Text mResult;
// 存储Geometry join的值的类
static class PolygonAgg implements AggregationBuffer {
Geometry geometry;
}
//定义:UDAF的返回类型,确定了DAJoin自定义UDF的返回类型是Text类型
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
mResult = new Text();
mInput = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
//内存创建,用来存储mapper,combiner,reducer运算过程中的相加总和。
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
PolygonAgg polygonAgg = new PolygonAgg();
reset(polygonAgg);
return polygonAgg;
}
public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
GeometryFactory factory = new GeometryFactory();
polygonAgg.geometry = factory.createPolygon(new Coordinate[]{});
}
//map阶段:获取每个mapper,去进行merge
public void iterate(AggregationBuffer aggregationBuffer, Object[] objects)
throws HiveException {
assert (objects.length == 1);
merge(aggregationBuffer, objects[0]);
}
//在一个子的partial中combiner合并map返回结果
public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
return terminate(aggregationBuffer);
}
//combiner合并map返回结果
public void merge(AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
if (partial != null) {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
String geoJson = PrimitiveObjectInspectorUtils.getString(partial, mInput);
Polygon polygon = JTSHelper.parse(geoJson);
polygonAgg.geometry = polygonAgg.geometry.union(polygon);
} catch (Exception e){
}
}
}
//reducer合并所有combiner返回结果
public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
Geometry buffer = polygonAgg.geometry.buffer(0);
mResult.set(JTSHelper.convert2String(buffer.convexHull()));
return mResult;
}catch (Exception e) {
return "";
}
}
}
}
复制代码
打包之后将其用在HIVE SQL中执行
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar; create temporary function da_join as 'me.ele.breat.hive.udf.DAJoinV2'; create table udaf_example as select id, da_join(da_range) as da_union_polygon from example group by id 复制代码
在UDTF的实现中,首先需要继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF ,实现 process , initialize 和 close 方法
initialize 返回StructObjectInspector对象,决定最后输出的column的名称和类型 process 是对每一个输入record进行处理,产生出一个新数组,传递到 forward 方法中进行处理 close 关闭整个调用的回调处,清理内存 public class S2SimpleRegionCoverV2 extends GenericUDTF {
private final static int LEVEL = 16;
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
List<String> structFieldNames = Lists.newArrayList("s2cellid");
List<ObjectInspector> structFieldObjectInspectors = Lists.<ObjectInspector>newArrayList(
PrimitiveObjectInspectorFactory.javaLongObjectInspector);
return ObjectInspectorFactory
.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
}
@Override
public void process(Object[] objects) throws HiveException {
String json = String.valueOf(objects[0]);
List<Long> s2cellids = toS2CellIds(json);
for (Long s2cellid: s2cellids){
forward(new Long[]{s2cellid});
}
}
public static List<Long> toS2CellIds(String json) {
GeometryFactory factory = new GeometryFactory();
GeoJsonReader reader = new GeoJsonReader();
Geometry geometry = null;
try {
geometry = reader.read(json);
} catch (ParseException e) {
geometry = factory.createPolygon(new Coordinate[]{});
}
List<S2Point> polygonS2Point = new ArrayList<S2Point>();
for (Coordinate coordinate : geometry.getCoordinates()) {
S2LatLng s2LatLng = S2LatLng.fromDegrees(coordinate.y, coordinate.x);
polygonS2Point.add(s2LatLng.toPoint());
}
List<S2Point> points = polygonS2Point;
if (points.size() == 0) {
return Lists.newArrayList();
}
ArrayList<S2CellId> result = new ArrayList<S2CellId>();
S2RegionCoverer
.getSimpleCovering(new S2Polygon(new S2Loop(points)), points.get(0), LEVEL, result);
List<Long> output = new ArrayList<Long>();
for (S2CellId s2CellId : result) {
output.add(s2CellId.id());
}
return output;
}
@Override
public void close() throws HiveException {
}
}
复制代码
在使用的时候和 lateral view 连在一起用
add jar hdfs://bipcluster/data/upload/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/google_s2_udf.jar; create temporary function da_cover as 'me.ele.breat.hive.udf.S2SimpleRegionCoverV2'; drop table if exists temp.cm_s2_id_cover_list; create table temp.cm_s2_id_cover_list as select tb_s2cellid.s2cellid, source.shop_id from ( select geometry, shop_id from example) source lateral view da_cover(geometry) tb_s2cellid as s2cellid; 复制代码