使用Spark开发代码过程时,很多时候当前库中的算子不能满足业务需求。此时,UDFs(user defined functions) 派上非常大的作用。基于DataFrame(或者DataSet) 的Java(或Python、Scale) 可以轻松的定义注册UDF,但是想在SQL(SparkSQL、Hive) 中自定义或者想共用就遇到困难。这时,可以先按照一定规约自定义函数,再向Spark(或Hive)注册为永久函数,实现在Spark和Hive共享UDF的目的。
根据官网1,可以知道,要实现UDF,至少需要继承UDAF、AbstractGenericUDAFResolver、GenericUDF、 GenericUDTF、UserDefinedAggregateFunction中的一个。如下已继承UDF为列进行说明:</br>
整体的实现包括两部:
maven工程的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sogo</groupId>
<artifactId>sparkudf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spark.version>2.3.1</spark.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.16</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.4</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
一个类实现一个evaluate方法,定义一个UDF</br>
类中的main仅用于测试,打包前请先注解掉</br>
StringLengthUdf.java
package com.sogo.sparkudf.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @Created by IntelliJ IDEA.
* @author: liuzhixuan@sogou-inc.com
* @Date: 2020/7/26
* @Time: 23:54
* @des:
*/
public class StringLengthUdf extends UDF {
// 默认调用 "evaluate" 方法
public int evaluate(String str) {
if (null == str) {
return 0;
} else {
return str.length();
}
}
// public static void main(String[] args) {
// StringLengthUdf stringLengthUdf = new StringLengthUdf();
// String str = "test";
// System.out.println("out:" + stringLengthUdf.evaluate(str));
// }
}
UDF输入多个参数 StringContainUdf.java
package com.sogo.sparkudf.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import java.io.Serializable;
public class StringContainUdf extends UDF implements Serializable {
// 修改evaluate的形参,满足UDF不同输入参数及类型的场景
public Boolean evaluate(String s1, String s2) {
if (null == s1 || null == s2) {
return false;
} else return s1.contains(s2);
}
}
CREATE [ OR REPLACE ] [ TEMPORARY ] FUNCTION [ IF NOT EXISTS ] function_name AS class_name [ resource_locations ]
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf' USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';
实验发现,在SparkSQL中注册的UDF需要在Hive客户端再次启动时生效;而在Hive中注册的UDF立即在SparkSQL中生效。
有时明明注册了UDF,客户端也重新连接了,但依然找不到UDF,可能是不在同一数据库,这点也需要重点关注下。</br>
# 查看已注册的function(hive、SparkSQL)
show functions;
## 查看已注册的UDF(SparkSQL)
show user functions;
# 进入hive环境(若没有指定数据库,UDF将归当前数据库所有)
> hive
# 添加jar包
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
# 注册为临时UDF
CREATE TEMPORARY FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 注册为永久UDF
CREATE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 更新永久UDF(这种方法在hive中不可用)
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 不更新,类似追加的方式
CREATE FUNCTION IF NOT EXISTS strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf';
每次添加显得麻烦,我们可以把将注册语句写入脚本,在进入hive前让它初始化。我们在配置SparkSQL时将这样做。
在SparkSQL中,可以采用在Hive中注册的方法。下面采用初始化配置sql脚本的方式说明。</br>
.bashrc配置
alias spark_sql="/opt/spark/bin/spark-sql \
--master yarn \
--deploy-mode client \
--driver-memory 4G \
--executor-memory 10G \
--num-executors 80 \
--executor-cores 4 \
--name 'pyspark_cluster_lzx' \
--queue adx_online \
--database bigdata_lzx \
--conf spark.dynamicAllocation.minExecutors=40 \
--conf spark.dynamicAllocation.maxExecutors=80 \
--conf spark.default.parallelism=1200 \
--conf spark.sql.shuffle.partitions=1200 \
--conf spark.eventLog.enabled=true \
--conf spark.sql.autoBroadcastJoinThreshold=104857600\
--hiveconf spark.hadoop.hive.cli.print.current.db=true \
--hiveconf spark.hadoop.hive.cli.print.header=true \
--hiveconf spark.hadoop.hive.resultset.use.unique.column.names=false\
--jars file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar\
-i /search/work/bigdata/liuzhixuan/sparkudf/spark_udf.sql"
注:--jars参数添加UDF的java实现到集群</br>
-i参数为预执行的代码</br>
spark_udf.sql
CREATE OR REPLACE FUNCTION strlen_udf_int AS 'com.sogo.sparkudf.udf.StringLengthUdf' USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';
执行
> spark_sql
> show user functions;
结果
spark-sql (default)> show user functions;
function
bigdata_lzx.strlen_udf_int
Time taken: 0.549 seconds, Fetched 1 row(s)
spark-sql (default)> select strlen_udf_int("liu");
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
Added [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar]
bigdata_lzx.strlen_udf_int(liu)
3
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。