Hive 系列概览
(1)hive系列之简介,安装,beeline和hiveserver2
(2)hive系列之基本操作
(3)hive系列之udf,udtf,udaf
(4)hive系列之二级分区和动态分区
(5)hive系列之分桶表
(6)hive系列之常用函数
(7)hive系列之系统讲解开窗函数
(8)hive系列之存储格式及常用压缩格式
(9)hive系列之数据仓库建模理论
(10)hive系列之数据仓库建模-维度表和事实表
(11)hive系列之数据仓库建模-退化维度和缓慢变化维
(12)hive系列之常用企业性能优化1
(13)hive系列之常用企业性能优化2
(14)hive系列之常用企业性能优化3
今天是第三讲,Hive 的 UDF,UDAF,UDTF
1
为什么需要 udf
udf,(User Defined Function)用户自定义函数
Hive 的 类 sql 给 开发者和分析者带来了极大的便利,使用 sql 就可以完成海量数据的处理,但是有时候,hive 自带的一些函数可能无法满足需求,这个时候,就需要我们自己定义一些函数,像插件一样在MapReduce过程中生效。
Hive中有3种UDF:
UDF:操作单个数据行,产生单个数据行; UDAF:操作多个数据行,产生一个数据行。 UDTF:操作一个数据行,产生多个数据行一个表作为输出。
2
如何实现一个udf
下面,实现一个udf,功能是:如果一个字符串大于2个字符,则只显示两个字符,后面的字符显示成...
首先,需要建立一个 maven 工程
贴出部分依赖如下:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hive.version>1.1.0</hive.version>
<hadoop.version>2.6.0</hadoop.version>
</properties>
<dependencies>
<!--hadoop client-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hive client-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>
然后是我们的代码
写一个类,继承 org.apache.hadoop.hive.ql.exec.UDF 这个类
public class StringUdf extends UDF {
public Text evaluate(Text text1) {
if (text1 == null || text1.toString().isEmpty()) {
return null;
}
if (text1.toString().length() > 2) {
return new Text(text1.toString().substring(0, 2) + "...");
}
return text1;
}
}
输入的是 Text,这个类是 hadoop 对 String 的包装
输出也是 Text,中间把 Text 转化成 字符串,并且截取了前两个字符,拼接上了...
然后,在 idea 中打包
把 jar 包上传到 Hiveserver2 所在的机器上,如果是 cdh 安装的话,需要上传到:
这个目录下
重启 HiveServer2
然后就可以使用了:
create temporary function string_udf as 'com.dsj361.hive.udf.StringUdf';
select string_udf("hello,world")
结果是:
he...
3
如何实现一个udtf
udtf,User-defined Table Generating Function
udtf,可以使一行变成多行
例如:有个这样的字符串,key:value;key:value
我们希望输出是 两行,每行key一个字段,value一个字段
代码如下:
需要继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
在initialize 方法中定义好输出字段名,和输出格式
在process方法中 ,定义每一行如何处理,forward 中传入数组,数组的每个元素就是一个字段
public class MyUdtf extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
ArrayList<String> fieldNames = new ArrayList<>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldNames.add("col2");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
String input = args[0].toString();
String[] strs = input.split(";");
for (int i = 0; i < strs.length; i++) {
try {
String[] result = strs[i].split(":");
forward(result);
} catch (Exception e) {
continue;
}
}
}
@Override
public void close() throws HiveException {
}
}
使用:
create temporary function my_udtf as 'com.dsj361.hive.udf.MyUdtf';
select my_udtf("key:value;key:value")
结果:
4
如何实现一个udaf
udaf
User-defined Aggregation Function,用户自定义聚合函数
通俗点说,就是你可能需要做一些特殊的甚至是非常扭曲的逻辑聚合,但是Hive自带的聚合函数不够玩,同时也还找不到高效的等价玩法,那么,这时候就该自己写一个UDAF了。
udaf 是比较难理解的一中自定义函数,需要了解 MapReduce 各个过程,并且在 map,combine,reduce 的不同过程中,编写不同的业务逻辑,最终实现效果
public class CountUdaf extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException {
return new TotalNumOfLettersEvaluator();
}
public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {
PrimitiveObjectInspector inputOI;
ObjectInspector outputOI;
PrimitiveObjectInspector integerOI;
int total = 0;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 1);
super.init(m, parameters);
// init input object inspectors
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
inputOI = (PrimitiveObjectInspector) parameters[0];
} else {
integerOI = (PrimitiveObjectInspector) parameters[0];
}
// init output object inspectors
outputOI = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
return outputOI;
}
static class SumBuffer implements AggregationBuffer {
int sum = 0;
void add(int num) {
sum += num;
}
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumBuffer result = new SumBuffer();
return result;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
SumBuffer myagg = new SumBuffer();
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
assert (parameters.length == 1);
if (parameters[0] != null) {
SumBuffer myagg = (SumBuffer) agg;
Object p1 = inputOI.getPrimitiveJavaObject(parameters[0]);
myagg.add(Integer.parseInt(String.valueOf(p1)));
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
SumBuffer myagg = (SumBuffer) agg;
total += myagg.sum;
return total;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
if (partial != null) {
SumBuffer myagg1 = (SumBuffer) agg;
Integer partialSum = (Integer) integerOI.getPrimitiveJavaObject(partial);
myagg1.add(partialSum);
}
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
SumBuffer myagg = (SumBuffer) agg;
return myagg.sum;
}
}
}
上面的这段代码是 实现了 Hive 中的 count 函数的功能
udaf 需要继承
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
主要的逻辑实现都在 GenericUDAFEvaluator 这个类中
那么什么是
帮助数据在 Map,reduce 的各个过程中,实现数据流转
这张图,是 sql 被解析器解析成 各个不同的 operator,不同的 operator 的数据传输都是 通过 ObjectInspector 来流转的
还会有一些跨节点的操作
另外就是 Mode 这个类
决定了在Map阶段和Reduce阶段 在涉及到对列进行UDF函数计算的时候,会调用UDF类中的哪些方法
并不是所有的方法都会调用,只会调用有限的几个。再由上面的提到的ObjectInspector进行数据的流转。
public static enum Mode {
/**
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
* 将会调用iterate()和terminatePartial()
*/
PARTIAL1,
/**
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
* 将会调用merge() 和 terminatePartial()
*/
PARTIAL2,
/**
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
* 将会调用merge()和terminate()
*/
FINAL,
/**
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
* 将会调用 iterate()和terminate()
*/
COMPLETE
};
下面这张是最精华的图,需要根据这个图,结合代码反复理解