前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >hive学习笔记之十:用户自定义聚合函数(UDAF)

hive学习笔记之十:用户自定义聚合函数(UDAF)

原创
作者头像
程序员欣宸
修改2021-07-14 15:41:24
2.5K0
修改2021-07-14 15:41:24
举报
文章被收录于专栏:实战docker实战docker

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

《hive学习笔记》系列导航

  1. 基本数据类型
  2. 复杂数据类型
  3. 内部表和外部表
  4. 分区表
  5. 分桶
  6. HiveQL基础
  7. 内置函数
  8. Sqoop
  9. 基础UDF
  10. 用户自定义聚合函数(UDAF)
  11. UDTF

本篇概览

  • 本文是《hive学习笔记》的第十篇,前文实践过UDF的开发、部署、使用,那个UDF适用于一进一出的场景,例如将每条记录的指定字段转为大写;
  • 除了一进一出,在使用group by的SQL中,多进一出也是常见场景,例如hive自带的avg、sum都是多进一出,这个场景的自定义函数叫做用户自定义聚合函数(User Defiend Aggregate Function,UDAF),UDAF的开发比一进一出要复杂一些,本篇文章就一起来实战UDAF开发;
  • 本文开发的UDAF名为udf_fieldlength ,用于group by的时候,统计指定字段在每个分组中的总长度;

准备工作

  • 在一些旧版的教程和文档中,都会提到UDAF开发的关键是继承UDAF.java;
  • 打开hive-exec的1.2.2版本源码,却发现UDAF类已被注解为Deprecated
  • UDAF类被废弃后,推荐的替代品有两种:实现GenericUDAFResolver2接口,或者继承AbstractGenericUDAFResolver类;
  • 现在新问题来了:上述两种替代品,咱们在做UDAF的时候该用哪一种呢?
  • 打开AbstractGenericUDAFResolver类的源码瞅一眼,如下所示,是否有种恍然大悟的感觉,这个类自身就是GenericUDAFResolver2接口的实现类:
代码语言:txt
复制
public abstract class AbstractGenericUDAFResolver
    implements GenericUDAFResolver2
{

  @SuppressWarnings("deprecation")
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
    throws SemanticException {

    if (info.isAllColumns()) {
      throw new SemanticException(
          "The specified syntax for UDAF invocation is invalid.");
    }

    return getEvaluator(info.getParameters());
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
    throws SemanticException {
    throw new SemanticException(
          "This UDAF does not support the deprecated getEvaluator() method.");
  }
}
  • 既然源码都看了,也就没啥好纠结的了,继承父类还是实现接口都可以,您自己看着选吧,我这里选的是继承AbstractGenericUDAFResolver类;

关于UDAF的四个阶段

  • 在编码前,要先了解UDAF的四个阶段,定义在GenericUDAFEvaluator的Mode枚举中:
  • COMPLETE:如果mapreduce只有map而没有reduce,就会进入这个阶段;
  • PARTIAL1:正常mapreduce的map阶段;
  • PARTIAL2:正常mapreduce的combiner阶段;
  • FINAL:正常mapreduce的reduce阶段;

每个阶段被调用的方法

  • 开发UDAF时,要继承抽象类GenericUDAFEvaluator,里面有多个抽象方法,在不同的阶段,会调用到这些方法中的一个或多个;
  • 下图对每个阶段调用了哪些方法说得很清楚:
在这里插入图片描述
在这里插入图片描述
  • 下图对顺序执行的三个阶段和涉及方法做了详细说明:
在这里插入图片描述
在这里插入图片描述

源码下载

  • 如果您不想编码,可以在GitHub下载所有源码,地址和链接信息如下表所示:

名称

链接

备注

项目主页

该项目在GitHub上的主页

git仓库地址(https)

该项目源码的仓库地址,https协议

git仓库地址(ssh)

git@github.com:zq2599/blog_demos.git

该项目源码的仓库地址,ssh协议

  • 这个git项目中有多个文件夹,本章的应用在hiveudf文件夹下,如下图红框所示:
    在这里插入图片描述
    在这里插入图片描述

UDAF开发步骤简述

开发UDAF分为以下几步:

  • 新建类FieldLengthAggregationBuffer,用于保存中间结果,该类需继承AbstractAggregationBuffer;
  • 新建类FieldLengthUDAFEvaluator,用于实现四个阶段中会被调用的方法,该类需继承GenericUDAFEvaluator;
  • 新建类FieldLength,用于在hive中注册UDAF,里面会实例化FieldLengthUDAFEvaluator,该类需继承AbstractGenericUDAFResolver;
  • 编译构建,得到jar;
  • 在hive添加jar;
  • 在hive注册函数;

接下来就按照上述步骤开始操作;

开发

  • 打开前文新建的hiveudf工程,新建FieldLengthAggregationBuffer.java,这个类的作用是缓存中间计算结果,每次计算的结果都放入这里面,被传递给下个阶段,其成员变量value用来保存累加数据:
代码语言:txt
复制
package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.util.JavaDataModel;

public class FieldLengthAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer {

    private Integer value = 0;

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }

    public void add(int addValue) {
        synchronized (value) {
            value += addValue;
        }
    }

    /**
     * 合并值缓冲区大小,这里是用来保存字符串长度,因此设为4byte
     * @return
     */
    @Override
    public int estimate() {
        return JavaDataModel.PRIMITIVES1;
    }
}
  • 新建FieldLengthUDAFEvaluator.java,里面是整个UDAF逻辑实现,关键代码已经添加了注释,请结合前面的图片来理解,核心思路是iterate将当前分组的字段处理完毕,merger把分散的数据合并起来,再由terminate决定当前分组计算结果:
代码语言:txt
复制
package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

/**
 * @Description: 这里是UDAF的实际处理类
 * @author: willzhao E-mail: zq2599@gmail.com
 * @date: 2020/11/4 9:57
 */
public class FieldLengthUDAFEvaluator extends GenericUDAFEvaluator {

    PrimitiveObjectInspector inputOI;

    ObjectInspector outputOI;

    PrimitiveObjectInspector integerOI;

    /**
     * 每个阶段都会被执行的方法,
     * 这里面主要是把每个阶段要用到的输入输出inspector好,其他方法被调用时就能直接使用了
     * @param m
     * @param parameters
     * @return
     * @throws HiveException
     */
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        super.init(m, parameters);

        // COMPLETE或者PARTIAL1,输入的都是数据库的原始数据
        if(Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {
            inputOI = (PrimitiveObjectInspector) parameters[0];
        } else {
            // PARTIAL2和FINAL阶段,都是基于前一个阶段init返回值作为parameters入参
            integerOI = (PrimitiveObjectInspector) parameters[0];
        }

        outputOI = ObjectInspectorFactory.getReflectionObjectInspector(
                Integer.class,
                ObjectInspectorFactory.ObjectInspectorOptions.JAVA
        );

        // 给下一个阶段用的,即告诉下一个阶段,自己输出数据的类型
        return outputOI;
    }

    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        return new FieldLengthAggregationBuffer();
    }

    /**
     * 重置,将总数清理掉
     * @param agg
     * @throws HiveException
     */
    public void reset(AggregationBuffer agg) throws HiveException {
        ((FieldLengthAggregationBuffer)agg).setValue(0);
    }

    /**
     * 不断被调用执行的方法,最终数据都保存在agg中
     * @param agg
     * @param parameters
     * @throws HiveException
     */
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        if(null==parameters || parameters.length<1) {
            return;
        }

        Object javaObj = inputOI.getPrimitiveJavaObject(parameters[0]);

        ((FieldLengthAggregationBuffer)agg).add(String.valueOf(javaObj).length());
    }

    /**
     * group by的时候返回当前分组的最终结果
     * @param agg
     * @return
     * @throws HiveException
     */
    public Object terminate(AggregationBuffer agg) throws HiveException {
        return ((FieldLengthAggregationBuffer)agg).getValue();
    }

    /**
     * 当前阶段结束时执行的方法,返回的是部分聚合的结果(map、combiner)
     * @param agg
     * @return
     * @throws HiveException
     */
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
        return terminate(agg);
    }

    /**
     * 合并数据,将总长度加入到缓存对象中(combiner或reduce)
     * @param agg
     * @param partial
     * @throws HiveException
     */
    public void merge(AggregationBuffer agg, Object partial) throws HiveException {

        ((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial));
    }
}
  • 最后是FieldLength.java,该类注册UDAF到hive时用到的,负责实例化FieldLengthUDAFEvaluator,给hive使用:
代码语言:txt
复制
package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class FieldLength extends AbstractGenericUDAFResolver {
    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new FieldLengthUDAFEvaluator();
    }

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
        return new FieldLengthUDAFEvaluator();
    }
}

至此,编码完成,接下来是部署和体验;

部署和体验

本次部署的注册方式是临时函数,如果您想注册为永久函数,请参考前文;

  • 在pom.xml所在目录执行mvn clean package -U,即可编译构建;
  • 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar
  • 上传到hive服务器,我这里是放在/home/hadoop/udf目录;
  • 进入hive会话,执行以下命令添加jar:
代码语言:txt
复制
add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;
  • 执行以下命令注册:
代码语言:txt
复制
create temporary function udf_fieldlength as 'com.bolingcavalry.hiveudf.udaf.FieldLength';
  • 找一个适合执行group by的表试试,我这里是前面的文章中创建的address表,完整数据如下:
代码语言:txt
复制
hive> select * from address;
OK
1	guangdong	guangzhou
2	guangdong	shenzhen
3	shanxi	xian
4	shanxi	hanzhong
6	jiangshu	nanjing
  • 执行下面的SQL:
代码语言:txt
复制
select province, count(city), udf_fieldlength(city) from address group by province;

执行结果如下,可见guangdong的guangzhou和shenzhen总长度为17,jiangsu的nanjing为7,shanxi的xian和hanzhong总长度12,符合预期:

代码语言:txt
复制
Total MapReduce CPU Time Spent: 2 seconds 730 msec
OK
guangdong	2	17
jiangshu	1	7
shanxi	2	12
Time taken: 28.484 seconds, Fetched: 3 row(s)

至此,UDAF的学习和实践就完成了,咱们掌握了多进一出的函数开发,由于涉及到多个阶段和外部调用的逻辑,使得UDAF的开发难度略大,接下来的文章是一进多出的开发,会简单一些。

关于容器和镜像的环境

如果您不想自己搭建kubernetes环境,推荐使用腾讯云容器服务TKE:无需自建,即可在腾讯云上使用稳定, 安全,高效,灵活扩展的 Kubernetes 容器平台;

如果您希望自己的镜像可以通过外网上传和下载,推荐腾讯云容器镜像服务TCR:像数据加密存储,大镜像多节点快速分发,跨地域镜像同步

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 欢迎访问我的GitHub
  • 《hive学习笔记》系列导航
  • 本篇概览
  • 准备工作
  • 关于UDAF的四个阶段
  • 每个阶段被调用的方法
  • 源码下载
  • UDAF开发步骤简述
  • 开发
  • 部署和体验
  • 关于容器和镜像的环境
  • 你不孤单,欣宸原创一路相伴
  • 欢迎关注公众号:程序员欣宸
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档