前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hive UDF/UDAF 总结

Hive UDF/UDAF 总结

原创
作者头像
windism
修改2020-10-15 10:17:36
2.5K0
修改2020-10-15 10:17:36
举报
文章被收录于专栏:风扬风扬

概述

在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能,这类函数分为三大类:

  • UDF(User-Defined-Function)
    • 特点:一进一出;
    • 继承UDF类(org.apache.hadoop.hive.ql.exec.UDF)
  • UDAF(User-Defined Aggregation Function)
    • 特点:多进一出
    • 继承UDAF类(org.apache.hadoop.hive.ql.exec.UDAF)
  • UDTF(User-Defined Table-Generating Functions)
    • 特点:一进多出
    • 继承UDTF类( org.apache.hadoop.hive.ql.udf.generic.GenericUDTF)

UDF(User-Defined-Function)

内置的UDF,一般分为两类,UDF、 GenericUDF.

相比于UDF,GenericUDF有两个优势

  1. 可以接受复杂的参数类型,返回复杂类型
  2. 可以接受变长参数个数(参数数组)

extends UDF

UDF类型的编写相对比较简单,父类源码github位置,简易示例如下

代码语言:txt
复制
import org.apache.hadoop.hive.ql.exec.UDF;
public class CustomUDF extends UDF{
    public String evaluate(String s){
        if (s == null) return null;
        return s.toString().toLowerCase();
    }
}

可以看出UDF子类只需要实现 evaluate 方法

从官方注释可以看出, 支持但不限于如下类型,

  • public int evaluate();
  • public int evaluate(int a);
  • public double evaluate(int a, double b);
  • public String evaluate(String a, int b, Text c);
  • public Text evaluate(String a);
  • public String evaluate(List<Integer> a);

从官方注释可以看出主要是要满足 evaluate 方法的要求

  1. 输入为JAVA 原语(Hive Array 会被转为 List, 如 ARRAY<int> 转为 List<Integer>})
  2. 输出为JAVA 原语或 org.apache.hadoop.io.Writable Writable

虽然简单,但是仔细分析一下源码,如何使用 evaluate 方法,从UDF父类中可以看到主要操作了 UDFMethodResolver.

UDFMethodResolver 如果没有指明则由 DefaultUDFMethodResolver 来生成,其源码如下,可以看出通过 getMethodInternal 获取到 evaluate 方法.

代码语言:txt
复制
public class DefaultUDFMethodResolver implements UDFMethodResolver {
  private final Class<? extends UDF> udfClass;

  public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
    this.udfClass = udfClass;
  }

  public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
    return FunctionRegistry.getMethodInternal(this.udfClass, "evaluate", false, argClasses);
  }
}

extends GenericUDF

GenericUDF 相对于 UDF 写法上更加复杂,需要自己定义三个函数,虽然有上述的两个优点,但是 Hive 官方并不推荐使用该方法,如果能够使用 UDF 实现尽量不使用 GenericUDF.父类源码github位置

代码语言:txt
复制
public class ScoreUDF extends GenericUDF {
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
    }

    @Override
    public String getDisplayString(String[] children) {
    }
}

从上述代码可以看出 evaluate 主要操作 DeferredObject 类型,该类型其实就是一个接口,该类内部实现一个类继承该接口 DeferredJavaObject. 该类仅仅只是封装了一个 JAVA 的 Object 对象.

initialize 方法则是用以检测输入的数据是否合法.

代码语言:txt
复制
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    if (arguments.length != 2) {
      throw new UDFArgumentLengthException("arrayContainsExample only takes 2 arguments: List<T>, T");
    }
    // 1. Check we received the right object types.
    ObjectInspector arg0 = arguments[0];
    ObjectInspector arg1 = arguments[1];
    if (!(arg0 instanceof ListObjectInspector) || !(arg1 instanceof StringObjectInspector)) {
      throw new UDFArgumentException("first argument must be a list / array, second argument must be a string");
    }
    this.listOI = (ListObjectInspector) arg0;
    this.elementOI = (StringObjectInspector) arg1;

    // 2. Check that the list contains strings
    if(!(listOI.getListElementObjectInspector() instanceof StringObjectInspector)) {
      throw new UDFArgumentException("first argument must be a list of strings");
    }

    // the return type of our function is a boolean, so we provide the correct object inspector
    return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
}

上述代码很长,整体流程为

  1. 检查传入的参数个数与每个参数的数据类型是正确的;
  2. 保存 converters (ObjectInspector) 用以供 evaluate() 使用;
  3. 返回 ListObjectInspector,让 Hive 能够读取该函数的返回结果;

关注 ObjectInspector 是个什么类型, 源码见ObjectInspector.java,简述如下,主要用以解耦数据类型.

代码语言:txt
复制
public interface ObjectInspector extends Cloneable {
    public static enum Category {
        PRIMITIVE, LIST, MAP, STRUCT, UNION
    };
    String getTypeName();
    Category getCategory();
}

如上的 ObjectInspector.Category.PRIMITIVE 支持如下类型,源码见rimitiveObjectInspector.java

代码语言:txt
复制
enum PrimitiveCategory {
    VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
    DATE, TIMESTAMP, TIMESTAMPLOCALTZ, BINARY, DECIMAL, VARCHAR, CHAR,
    INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME, UNKNOWN
}

getDisplayString 用于当实现的GenericUDF出错的时候,打印出提示信息.

UDAF(User-Defined Aggregation Function)

UDAF 是 Hive 中用户自定义的聚合函数,内置的 UDAF 有 max() 等.

UDAF 是需要 hive sql 语句和 group by 联合使用的. 聚合函数常常需要对大量数组进行操作,所以在编写程序时,一定要注意内存溢出问题.

  • Simple: 即继承org.apache.hadoop.hive.ql.exec.UDAF类,并在派生类中以静态内部类的方式实现org.apache.hadoop.hive.ql.exec.UDAFEvaluator接口.
    • 这种方式简单直接,但是在使用过程中需要依赖JAVA反射机制,因此性能相对较低.
    • 在Hive源码包org.apache.hadoop.hive.contrib.udaf.example中包含几个示例, 但是这些接口已经被注解为Deprecated,建议不要使用这种方式开发新的UDAF函数.
  • Generic: 这是Hive社区推荐的新的写法,以抽象类代替原有的接口.新的抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver替代老的UDAF接口,新的抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator替代老的UDAFEvaluator接口.

简单 UDAF

代码语言:txt
复制
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

@Description(name = "example_avg",
value = "_FUNC_(col) - Example UDAF to compute average")
public final class UDAFExampleAvg extends UDAF {

  /**
   * The internal state of an aggregation for average.
   *
   * Note that this is only needed if the internal state cannot be represented
   * by a primitive.
   *
   * The internal state can also contains fields with types like
   * ArrayList&lt;String&gt; and HashMap&lt;String,Double&gt; if needed.
   */
  public static class UDAFAvgState {
    private long mCount;
    private double mSum;
  }

  /**
   * The actual class for doing the aggregation. Hive will automatically look
   * for all internal classes of the UDAF that implements UDAFEvaluator.
   */
  public static class UDAFExampleAvgEvaluator implements UDAFEvaluator {

    UDAFAvgState state;

    public UDAFExampleAvgEvaluator() {
      super();
      state = new UDAFAvgState();
      init();
    }

    /**
     * Reset the state of the aggregation.
     */
    public void init() {
      state.mSum = 0;
      state.mCount = 0;
    }

    /**
     * Iterate through one row of original data.
     *
     * The number and type of arguments need to the same as we call this UDAF
     * from Hive command line.
     *
     * This function should always return true.
     */
    public boolean iterate(Double o) {
      if (o != null) {
        state.mSum += o;
        state.mCount++;
      }
      return true;
    }

    /**
     * Terminate a partial aggregation and return the state. If the state is a
     * primitive, just return primitive Java classes like Integer or String.
     */
    public UDAFAvgState terminatePartial() {
      // This is SQL standard - average of zero items should be null.
      return state.mCount == 0 ? null : state;
    }

    /**
     * Merge with a partial aggregation.
     *
     * This function should always have a single argument which has the same
     * type as the return value of terminatePartial().
     */
    public boolean merge(UDAFAvgState o) {
      if (o != null) {
        state.mSum += o.mSum;
        state.mCount += o.mCount;
      }
      return true;
    }

    /**
     * Terminates the aggregation and return the final result.
     */
    public Double terminate() {
      // This is SQL standard - average of zero items should be null.
      return state.mCount == 0 ? null : Double.valueOf(state.mSum
          / state.mCount);
    }
  }

}

总结:

  • UDAF要继承于UDAF父类 org.apache.hadoop.hive.ql.exec.UDAF.
  • 内部类要实现 org.apache.hadoop.hive.ql.exec.UDAFEvaluator 接口.
  • UDAFExampleAvgEvaluator 类里需要实现 init、iterate、terminatePartial、merge、terminate 这几个函数,是必不可少的
  • init() 方法用来进行全局初始化的.
  • iterate() 中实现累加逻辑.
  • terminatePartial 是Hive部分聚集时调用的,类似于MapReduce里的Combiner,这里 能保证能得到各个部分的状态累加.
  • merge 是多个部分合并时调用的,得到了参与合并的最大值.
  • terminate 是最终Reduce合并时调用的,得到最大值.

通用UDAF

通用UDAF的编写主要如下两步:

  1. 编写resolver类,resolver负责类型检查,操作符重载.类继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver,AbstractGenericUDAFResolver实现了org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2,方便统一接口.
  2. 编写evaluator类.evaluator真正实现UDAF的逻辑.通常来说,实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator,包括几个必须实现的抽象方法,这几个方法负责完成UDAF所需要处理的逻辑.
UDAF的运行流程简介

抽象类GenericUDAFEvaluator中,包含一个静态内部枚举类,和一系列抽象方法.这个枚举类的注释中,解释了各个枚举值的运行阶段和运行内容.按照时间先后顺序,分别有:

  1. PARTIAL1:原始数据到部分聚合,调用iterate和terminatePartial –> map阶段
  2. PARTIAL2: 部分聚合到部分聚合,调用merge和terminatePartial –> combine阶段
  3. FINAL: 部分聚合到完全聚合,调用merge和terminate –> reduce阶段
  4. COMPLETE: 从原始数据直接到完全聚合 –> map阶段,并且没有reduce
UDAF方法
  • init(Mode m, ObjectInspector[] parameters): 这个是非必须的,但是一般是需要的.实例化Evaluator类的时候调用的,在不同的阶段需要返回不同的OI.需要注意的是,在不同的模式下parameters的含义是不同的,比如m为 PARTIAL1 和 COMPLETE 时,parameters为原始数据;m为 PARTIAL2 和 FINAL 时,parameters仅为部分聚合数据(只有一个元素).在 PARTIAL1PARTIAL2 模式下,ObjectInspector 用于terminatePartial方法的返回值,在FINAL和COMPLETE模式下ObjectInspector 用于terminate方法的返回值. 其入参和返回值,以及Mode阶段的关系如下表:

mode

入参

返回值的使用者

PARTIAL1

原始数据

terminatePartial

PARTIAL2

部分聚合数据

terminatePartial

FINAL

部分聚合数据

terminate

COMPLETE

原始数据

terminate

  • getNewAggregationBuffer(): 返回存储临时聚合结果的AggregationBuffer对象
  • reset(AggregationBuffer agg): 重置聚合结果对象,以支持mapper和reducer的重用.
  • iterate(AggregationBuffer agg, Object[] parameters):迭代处理原始数据parameters并保存到agg中
  • terminatePartial(AggregationBuffer agg):返回部分聚合数据的持久化对象.因为调用这个方法时,说明已经是map或者combine的结束了,必须将数据持久化以后交给reduce进行处理.只支持JAVA原始数据类型及其封装类型、HADOOP Writable类型、List、Map,不能返回自定义的类,即使实现了Serializable也不行,否则会出现问题或者错误的结果.
  • merge(AggregationBuffer agg, Object partial):将terminatePartial返回的部分聚合数据进行合并,需要使用到对应的OI.
  • terminate(AggregationBuffer agg):返回最终结果.

参考资料

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • UDF(User-Defined-Function)
    • extends UDF
      • extends GenericUDF
      • UDAF(User-Defined Aggregation Function)
        • 简单 UDAF
          • 通用UDAF
            • UDAF的运行流程简介
            • UDAF方法
        • 参考资料
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档