作者简介
淳敏,物流架构师同时也是一位team leader,工作认真负责,曾在休假期间“面向大海编程”,不明觉厉
在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能。Hive 自定义函数主要包含以下三种:
 Hive的UDF机制是需要用户实现: Resolver 和 Evaluator ,其中 Resolver 就用来处理输入,调用 Evaluator , Evaluator 就是具体功能的实现。 
 Hadoop提供了一个基础类 org.apache.hadoop.hive.ql.exec.UDF ,在这个类中含有了一个 UDFMethodResolver 的接口实现类 DefaultUDFMethodResolver 的对象。 
public class UDF {
  private UDFMethodResolver rslv;
  public UDF() {
    this.rslv = new DefaultUDFMethodResolver(this.getClass());
  }
	......
}
复制代码 
  在 DefaultUDFMethodResolver 中,提供了一个 getEvalMethod 的方法,从切面调用 UDF 的 evaluate 方法 
public class DefaultUDFMethodResolver implements UDFMethodResolver {
  private final Class<? extends UDF> udfClass;
  public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
    this.udfClass = udfClass;
  }
  public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
    return FunctionRegistry.getMethodInternal(this.udfClass, "evaluate", false, argClasses);
  }
}
复制代码 
  自定义UDF的实现上以继承 org.apache.hadoop.hive.ql.exec.UDF 为基础,然后实现一个 evaluate 方法,该方法会被 DefaultUDFMethodResolver 对象执行。 
public class DAIsContainPoint extends UDF {
  public Boolean evaluate(Double longitude, Double latitude, String geojson) {
    Boolean isContained = false;
    try {
      Polygon polygon = JTSHelper.parse(geojson);
      Coordinate center = new Coordinate(longitude, latitude);
      GeometryFactory factory = new GeometryFactory();
      Point point = factory.createPoint(center);
      isContained = polygon.contains(point);
    }catch (Throwable e){
      isContained = false;
    }finally {
      return isContained;
    }
  }
}
复制代码 
  完成了代码定义之后需要对其进行打包,编译成一个 jar ,注意: 最终的 jar 中需要包含所有依赖的 jar , maven 编译上推荐使用 maven-shade-plugin 
<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.2</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>META-INF/*.SF</exclude>
                  <exclude>META-INF/*.DSA</exclude>
                  <exclude>META-INF/*.RSA</exclude>
                </excludes>
              </filter>
            </filters>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
复制代码 
  最后产生的 jar 文件需要在HIVE SQL中被引用 
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar; create temporary function is_in_polygon as 'me.ele.breat.hive.udf.DAIsContainPoint'; select lat, lng, geojson, is_in_polygon(lat, lng, geojson) as is_in from example; 复制代码
 在Hive的聚合计算中,采用MapReduce的方式来加快聚合的速度,而UDAF就是用来撰写聚合类自定义方法的扩展方式。关于MapReduce需要补充知识的请看这里,为了更好的说明白UDAF我们需要知道一下 MapReduce 的流程 
 
 
  回到Hive中来,在UDAF的实现中,首先需要继承 org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver ,并实现 org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2 接口。然后构造 GenericUDAFEvaluator 类,实现MapReduce的计算过程,其中有3个关键的方法 
iterate merge terminate
 然后再实现一个继承 AbstractGenericUDAFResolver 的类,重载其 getEvaluator 的方法,返回一个 GenericUDAFEvaluator 的实例 
public class DAJoinV2 extends AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo genericUDAFParameterInfo)
      throws SemanticException {
    return new DAJoinStringEvaluator();
  }
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] typeInfos) throws SemanticException {
    if (typeInfos.length != 1) {
      throw new UDFArgumentTypeException(typeInfos.length - 1,
          "Exactly one argument is expected.");
    }
    if (typeInfos[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
              + typeInfos[0].getTypeName() + " is passed.");
    }
    switch (((PrimitiveTypeInfo) typeInfos[0]).getPrimitiveCategory()) {
      case STRING:
        return new DAJoinStringEvaluator();
      default:
        throw new UDFArgumentTypeException(0,
            "Only numeric or string type arguments are accepted but "
                + typeInfos[0].getTypeName() + " is passed.");
    }
  }
  public static class DAJoinStringEvaluator extends GenericUDAFEvaluator {
    private PrimitiveObjectInspector mInput;
    private Text mResult;
    // 存储Geometry join的值的类
    static class PolygonAgg implements AggregationBuffer {
      Geometry geometry;
    }
    //定义:UDAF的返回类型,确定了DAJoin自定义UDF的返回类型是Text类型
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      assert (parameters.length == 1);
      super.init(m, parameters);
      mResult = new Text();
      mInput = (PrimitiveObjectInspector) parameters[0];
      return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
    }
    //内存创建,用来存储mapper,combiner,reducer运算过程中的相加总和。
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      PolygonAgg polygonAgg = new PolygonAgg();
      reset(polygonAgg);
      return polygonAgg;
    }
    public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
      PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
      GeometryFactory factory = new GeometryFactory();
      polygonAgg.geometry = factory.createPolygon(new Coordinate[]{});
    }
    //map阶段:获取每个mapper,去进行merge
    public void iterate(AggregationBuffer aggregationBuffer, Object[] objects)
        throws HiveException {
      assert (objects.length == 1);
      merge(aggregationBuffer, objects[0]);
    }
    //在一个子的partial中combiner合并map返回结果
    public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
      return terminate(aggregationBuffer);
    }
    //combiner合并map返回结果
    public void merge(AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
      if (partial != null) {
        try {
          PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
          String geoJson = PrimitiveObjectInspectorUtils.getString(partial, mInput);
          Polygon polygon = JTSHelper.parse(geoJson);
          polygonAgg.geometry = polygonAgg.geometry.union(polygon);
        } catch (Exception e){
        }
      }
    }
    //reducer合并所有combiner返回结果
    public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
      try {
        PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
        Geometry buffer = polygonAgg.geometry.buffer(0);
        mResult.set(JTSHelper.convert2String(buffer.convexHull()));
        return mResult;
      }catch (Exception e) {
        return "";
      }
    }
  }
}
复制代码 
 打包之后将其用在HIVE SQL中执行
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar; create temporary function da_join as 'me.ele.breat.hive.udf.DAJoinV2'; create table udaf_example as select id, da_join(da_range) as da_union_polygon from example group by id 复制代码
 在UDTF的实现中,首先需要继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF ,实现 process , initialize 和 close 方法 
initialize 返回StructObjectInspector对象,决定最后输出的column的名称和类型 process 是对每一个输入record进行处理,产生出一个新数组,传递到 forward 方法中进行处理 close 关闭整个调用的回调处,清理内存 public class S2SimpleRegionCoverV2 extends GenericUDTF {
  private final static  int LEVEL = 16;
  @Override
  public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    List<String> structFieldNames = Lists.newArrayList("s2cellid");
    List<ObjectInspector> structFieldObjectInspectors = Lists.<ObjectInspector>newArrayList(
        PrimitiveObjectInspectorFactory.javaLongObjectInspector);
    return ObjectInspectorFactory
        .getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
  }
  @Override
  public void process(Object[] objects) throws HiveException {
    String json = String.valueOf(objects[0]);
    List<Long> s2cellids = toS2CellIds(json);
    for (Long s2cellid: s2cellids){
      forward(new Long[]{s2cellid});
    }
  }
  public static List<Long> toS2CellIds(String json) {
    GeometryFactory factory = new GeometryFactory();
    GeoJsonReader reader = new GeoJsonReader();
    Geometry geometry = null;
    try {
      geometry = reader.read(json);
    } catch (ParseException e) {
      geometry = factory.createPolygon(new Coordinate[]{});
    }
    List<S2Point> polygonS2Point = new ArrayList<S2Point>();
    for (Coordinate coordinate : geometry.getCoordinates()) {
      S2LatLng s2LatLng = S2LatLng.fromDegrees(coordinate.y, coordinate.x);
      polygonS2Point.add(s2LatLng.toPoint());
    }
    List<S2Point> points = polygonS2Point;
    if (points.size() == 0) {
      return Lists.newArrayList();
    }
    ArrayList<S2CellId> result = new ArrayList<S2CellId>();
    S2RegionCoverer
        .getSimpleCovering(new S2Polygon(new S2Loop(points)), points.get(0), LEVEL, result);
    List<Long> output = new ArrayList<Long>();
    for (S2CellId s2CellId : result) {
      output.add(s2CellId.id());
    }
    return output;
  }
  @Override
  public void close() throws HiveException {
  }
}
复制代码 
  在使用的时候和 lateral view 连在一起用 
add jar hdfs://bipcluster/data/upload/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/google_s2_udf.jar; create temporary function da_cover as 'me.ele.breat.hive.udf.S2SimpleRegionCoverV2'; drop table if exists temp.cm_s2_id_cover_list; create table temp.cm_s2_id_cover_list as select tb_s2cellid.s2cellid, source.shop_id from ( select geometry, shop_id from example) source lateral view da_cover(geometry) tb_s2cellid as s2cellid; 复制代码