Hive 系列 之 UDF,UDTF,UDAF

Hive 系列概览

(1)hive系列之简介,安装,beeline和hiveserver2

(2)hive系列之基本操作

(3)hive系列之udf,udtf,udaf

(4)hive系列之二级分区和动态分区

(5)hive系列之分桶表

(6)hive系列之常用函数

(7)hive系列之系统讲解开窗函数

(8)hive系列之存储格式及常用压缩格式

(9)hive系列之数据仓库建模理论

(10)hive系列之数据仓库建模-维度表和事实表

(11)hive系列之数据仓库建模-退化维度和缓慢变化维

(12)hive系列之常用企业性能优化1

(13)hive系列之常用企业性能优化2

(14)hive系列之常用企业性能优化3

今天是第三讲,Hive 的 UDF,UDAF,UDTF

1

为什么需要 udf

udf,(User Defined Function)用户自定义函数

Hive 的 类 sql 给 开发者和分析者带来了极大的便利,使用 sql 就可以完成海量数据的处理,但是有时候,hive 自带的一些函数可能无法满足需求,这个时候,就需要我们自己定义一些函数,像插件一样在MapReduce过程中生效。

Hive中有3种UDF:

UDF:操作单个数据行,产生单个数据行; UDAF:操作多个数据行,产生一个数据行。 UDTF:操作一个数据行,产生多个数据行一个表作为输出。

2

如何实现一个udf

下面,实现一个udf,功能是:如果一个字符串大于2个字符,则只显示两个字符,后面的字符显示成...

首先,需要建立一个 maven 工程

贴出部分依赖如下:

 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <hive.version>1.1.0</hive.version>
    <hadoop.version>2.6.0</hadoop.version>
  </properties>


  <dependencies>
    <!--hadoop client-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>


    <!--hive client-->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
    </dependency>


    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hive.version}</version>
    </dependency>
  </dependencies>

然后是我们的代码

写一个类,继承 org.apache.hadoop.hive.ql.exec.UDF 这个类

public class StringUdf extends UDF {


    public Text evaluate(Text text1) {
        if (text1 == null || text1.toString().isEmpty()) {
            return null;
        }
        if (text1.toString().length() > 2) {
            return new Text(text1.toString().substring(0, 2) + "...");
        }
        return text1;
    }


}

输入的是 Text,这个类是 hadoop 对 String 的包装

输出也是 Text,中间把 Text 转化成 字符串,并且截取了前两个字符,拼接上了...

然后,在 idea 中打包

把 jar 包上传到 Hiveserver2 所在的机器上,如果是 cdh 安装的话,需要上传到:

这个目录下

重启 HiveServer2

然后就可以使用了:

create temporary function string_udf as 'com.dsj361.hive.udf.StringUdf';
select string_udf("hello,world")

结果是:

he...

3

如何实现一个udtf

udtf,User-defined Table Generating Function

udtf,可以使一行变成多行

例如:有个这样的字符串,key:value;key:value

我们希望输出是 两行,每行key一个字段,value一个字段

代码如下:

需要继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

在initialize 方法中定义好输出字段名,和输出格式

在process方法中 ,定义每一行如何处理,forward 中传入数组,数组的每个元素就是一个字段

public class MyUdtf extends GenericUDTF {


    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        ArrayList<String> fieldNames = new ArrayList<>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldNames.add("col1");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }


    @Override
    public void process(Object[] args) throws HiveException {
        String input = args[0].toString();
        String[] strs = input.split(";");
        for (int i = 0; i < strs.length; i++) {
            try {
                String[] result = strs[i].split(":");
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }


    @Override
    public void close() throws HiveException {
    }
}

使用:

create temporary function my_udtf as 'com.dsj361.hive.udf.MyUdtf';
select my_udtf("key:value;key:value") 

结果:

4

如何实现一个udaf

udaf

User-defined Aggregation Function,用户自定义聚合函数

通俗点说,就是你可能需要做一些特殊的甚至是非常扭曲的逻辑聚合,但是Hive自带的聚合函数不够玩,同时也还找不到高效的等价玩法,那么,这时候就该自己写一个UDAF了。

udaf 是比较难理解的一中自定义函数,需要了解 MapReduce 各个过程,并且在 map,combine,reduce 的不同过程中,编写不同的业务逻辑,最终实现效果

public class CountUdaf extends AbstractGenericUDAFResolver {

    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new TotalNumOfLettersEvaluator();
    }
    
    public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

        PrimitiveObjectInspector inputOI;
        ObjectInspector outputOI;
        PrimitiveObjectInspector integerOI;

        int total = 0;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            assert (parameters.length == 1);
            super.init(m, parameters);
            // init input object inspectors
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
                inputOI = (PrimitiveObjectInspector) parameters[0];
            } else {
                integerOI = (PrimitiveObjectInspector) parameters[0];
            }

            // init output object inspectors
            outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
                    ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            return outputOI;
        }


        static class SumBuffer implements AggregationBuffer {
            int sum = 0;
            void add(int num) {
                sum += num;
            }
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            SumBuffer result = new SumBuffer();
            return result;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            SumBuffer myagg = new SumBuffer();
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            assert (parameters.length == 1);
            if (parameters[0] != null) {
                SumBuffer myagg = (SumBuffer) agg;
                Object p1 = inputOI.getPrimitiveJavaObject(parameters[0]);
                myagg.add(Integer.parseInt(String.valueOf(p1)));
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            SumBuffer myagg = (SumBuffer) agg;
            total += myagg.sum;
            return total;
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null) {
                SumBuffer myagg1 = (SumBuffer) agg;
                Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
                myagg1.add(partialSum);
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumBuffer myagg = (SumBuffer) agg;
            return myagg.sum;
        }
    }
}

上面的这段代码是 实现了 Hive 中的 count 函数的功能

udaf 需要继承

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

主要的逻辑实现都在 GenericUDAFEvaluator 这个类中

那么什么是

ObjectInspector

帮助数据在 Map,reduce 的各个过程中,实现数据流转

这张图,是 sql 被解析器解析成 各个不同的 operator,不同的 operator 的数据传输都是 通过 ObjectInspector 来流转的

还会有一些跨节点的操作

另外就是 Mode 这个类

决定了在Map阶段和Reduce阶段 在涉及到对列进行UDF函数计算的时候,会调用UDF类中的哪些方法

并不是所有的方法都会调用,只会调用有限的几个。再由上面的提到的ObjectInspector进行数据的流转。

public static enum Mode {
    /**
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     */
    PARTIAL1,
        /**
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
     * 将会调用merge() 和 terminatePartial() 
     */
    PARTIAL2,
        /**
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
     * 将会调用merge()和terminate()
     */
    FINAL,
        /**
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
      * 将会调用 iterate()和terminate()
     */
    COMPLETE
  };

下面这张是最精华的图,需要根据这个图,结合代码反复理解

本文分享自微信公众号 - kk大数据(kkbigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券