前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hive 系列 之 UDF,UDTF,UDAF

Hive 系列 之 UDF,UDTF,UDAF

作者头像
kk大数据
发布2019-08-14 15:33:26
4.9K0
发布2019-08-14 15:33:26
举报
文章被收录于专栏:kk大数据

Hive 系列概览

(1)hive系列之简介,安装,beeline和hiveserver2

(2)hive系列之基本操作

(3)hive系列之udf,udtf,udaf

(4)hive系列之二级分区和动态分区

(5)hive系列之分桶表

(6)hive系列之常用函数

(7)hive系列之系统讲解开窗函数

(8)hive系列之存储格式及常用压缩格式

(9)hive系列之数据仓库建模理论

(10)hive系列之数据仓库建模-维度表和事实表

(11)hive系列之数据仓库建模-退化维度和缓慢变化维

(12)hive系列之常用企业性能优化1

(13)hive系列之常用企业性能优化2

(14)hive系列之常用企业性能优化3

今天是第三讲,Hive 的 UDF,UDAF,UDTF

1

为什么需要 udf

udf,(User Defined Function)用户自定义函数

Hive 的 类 sql 给 开发者和分析者带来了极大的便利,使用 sql 就可以完成海量数据的处理,但是有时候,hive 自带的一些函数可能无法满足需求,这个时候,就需要我们自己定义一些函数,像插件一样在MapReduce过程中生效。

Hive中有3种UDF:

UDF:操作单个数据行,产生单个数据行; UDAF:操作多个数据行,产生一个数据行。 UDTF:操作一个数据行,产生多个数据行一个表作为输出。

2

如何实现一个udf

下面,实现一个udf,功能是:如果一个字符串大于2个字符,则只显示两个字符,后面的字符显示成...

首先,需要建立一个 maven 工程

贴出部分依赖如下:

代码语言:javascript
复制
 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <hive.version>1.1.0</hive.version>
    <hadoop.version>2.6.0</hadoop.version>
  </properties>


  <dependencies>
    <!--hadoop client-->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>


    <!--hive client-->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>${hive.version}</version>
    </dependency>


    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>${hive.version}</version>
    </dependency>
  </dependencies>

然后是我们的代码

写一个类,继承 org.apache.hadoop.hive.ql.exec.UDF 这个类

代码语言:javascript
复制
public class StringUdf extends UDF {


    public Text evaluate(Text text1) {
        if (text1 == null || text1.toString().isEmpty()) {
            return null;
        }
        if (text1.toString().length() > 2) {
            return new Text(text1.toString().substring(0, 2) + "...");
        }
        return text1;
    }


}

输入的是 Text,这个类是 hadoop 对 String 的包装

输出也是 Text,中间把 Text 转化成 字符串,并且截取了前两个字符,拼接上了...

然后,在 idea 中打包

把 jar 包上传到 Hiveserver2 所在的机器上,如果是 cdh 安装的话,需要上传到:

这个目录下

重启 HiveServer2

然后就可以使用了:

代码语言:javascript
复制
create temporary function string_udf as 'com.dsj361.hive.udf.StringUdf';
select string_udf("hello,world")

结果是:

he...

3

如何实现一个udtf

udtf,User-defined Table Generating Function

udtf,可以使一行变成多行

例如:有个这样的字符串,key:value;key:value

我们希望输出是 两行,每行key一个字段,value一个字段

代码如下:

需要继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

在initialize 方法中定义好输出字段名,和输出格式

在process方法中 ,定义每一行如何处理,forward 中传入数组,数组的每个元素就是一个字段

代码语言:javascript
复制
public class MyUdtf extends GenericUDTF {


    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        ArrayList<String> fieldNames = new ArrayList<>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldNames.add("col1");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }


    @Override
    public void process(Object[] args) throws HiveException {
        String input = args[0].toString();
        String[] strs = input.split(";");
        for (int i = 0; i < strs.length; i++) {
            try {
                String[] result = strs[i].split(":");
                forward(result);
            } catch (Exception e) {
                continue;
            }
        }
    }


    @Override
    public void close() throws HiveException {
    }
}

使用:

代码语言:javascript
复制
create temporary function my_udtf as 'com.dsj361.hive.udf.MyUdtf';
select my_udtf("key:value;key:value") 

结果:

4

如何实现一个udaf

udaf

User-defined Aggregation Function,用户自定义聚合函数

通俗点说,就是你可能需要做一些特殊的甚至是非常扭曲的逻辑聚合,但是Hive自带的聚合函数不够玩,同时也还找不到高效的等价玩法,那么,这时候就该自己写一个UDAF了。

udaf 是比较难理解的一中自定义函数,需要了解 MapReduce 各个过程,并且在 map,combine,reduce 的不同过程中,编写不同的业务逻辑,最终实现效果

代码语言:javascript
复制
public class CountUdaf extends AbstractGenericUDAFResolver {

    @Override
    public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
        return new TotalNumOfLettersEvaluator();
    }
    
    public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

        PrimitiveObjectInspector inputOI;
        ObjectInspector outputOI;
        PrimitiveObjectInspector integerOI;

        int total = 0;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            assert (parameters.length == 1);
            super.init(m, parameters);
            // init input object inspectors
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
                inputOI = (PrimitiveObjectInspector) parameters[0];
            } else {
                integerOI = (PrimitiveObjectInspector) parameters[0];
            }

            // init output object inspectors
            outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
                    ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
            return outputOI;
        }


        static class SumBuffer implements AggregationBuffer {
            int sum = 0;
            void add(int num) {
                sum += num;
            }
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            SumBuffer result = new SumBuffer();
            return result;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            SumBuffer myagg = new SumBuffer();
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            assert (parameters.length == 1);
            if (parameters[0] != null) {
                SumBuffer myagg = (SumBuffer) agg;
                Object p1 = inputOI.getPrimitiveJavaObject(parameters[0]);
                myagg.add(Integer.parseInt(String.valueOf(p1)));
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            SumBuffer myagg = (SumBuffer) agg;
            total += myagg.sum;
            return total;
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            if (partial != null) {
                SumBuffer myagg1 = (SumBuffer) agg;
                Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
                myagg1.add(partialSum);
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            SumBuffer myagg = (SumBuffer) agg;
            return myagg.sum;
        }
    }
}

上面的这段代码是 实现了 Hive 中的 count 函数的功能

udaf 需要继承

org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver

主要的逻辑实现都在 GenericUDAFEvaluator 这个类中

那么什么是

ObjectInspector

帮助数据在 Map,reduce 的各个过程中,实现数据流转

这张图,是 sql 被解析器解析成 各个不同的 operator,不同的 operator 的数据传输都是 通过 ObjectInspector 来流转的

还会有一些跨节点的操作

另外就是 Mode 这个类

决定了在Map阶段和Reduce阶段 在涉及到对列进行UDF函数计算的时候,会调用UDF类中的哪些方法

并不是所有的方法都会调用,只会调用有限的几个。再由上面的提到的ObjectInspector进行数据的流转。

代码语言:javascript
复制
public static enum Mode {
    /**
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
     * 将会调用iterate()和terminatePartial()
     */
    PARTIAL1,
        /**
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
     * 将会调用merge() 和 terminatePartial() 
     */
    PARTIAL2,
        /**
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
     * 将会调用merge()和terminate()
     */
    FINAL,
        /**
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
      * 将会调用 iterate()和terminate()
     */
    COMPLETE
  };

下面这张是最精华的图,需要根据这个图,结合代码反复理解

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ObjectInspector
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档