前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark UDF实现demo

Spark UDF实现demo

原创
作者头像
mikeLiu
修改2020-08-28 15:25:58
3.4K0
修改2020-08-28 15:25:58
举报
文章被收录于专栏:技术学习技术学习

Spark UDF实现demo

1 前言

使用Spark开发代码过程时,很多时候当前库中的算子不能满足业务需求。此时,UDFs(user defined functions) 派上非常大的作用。基于DataFrame(或者DataSet) 的Java(或Python、Scale) 可以轻松的定义注册UDF,但是想在SQL(SparkSQL、Hive) 中自定义或者想共用就遇到困难。这时,可以先按照一定规约自定义函数,再向Spark(或Hive)注册为永久函数,实现在Spark和Hive共享UDF的目的。

2 具体实现

根据官网1,可以知道,要实现UDF,至少需要继承UDAF、AbstractGenericUDAFResolver、GenericUDF、 GenericUDTF、UserDefinedAggregateFunction中的一个。如下已继承UDF为列进行说明:</br>

整体的实现包括两部:

  1. 继承父类开发UDF
  2. 注册UDF

2.1 继承父类开发UDF

2.1.1 基于java实现2

maven工程的pom.xml

代码语言:txt
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sogo</groupId>
    <artifactId>sparkudf</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spark.version>2.3.1</spark.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.16</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>

</project>

一个类实现一个evaluate方法,定义一个UDF</br>

类中的main仅用于测试,打包前请先注解掉</br>

StringLengthUdf.java

代码语言:txt
复制
package com.sogo.sparkudf.udf;

import org.apache.hadoop.hive.ql.exec.UDF;


/**
 * @Created by IntelliJ IDEA.
 * @author: liuzhixuan@sogou-inc.com
 * @Date: 2020/7/26
 * @Time: 23:54
 * @des:
 */
public class StringLengthUdf extends UDF {
    // 默认调用 "evaluate" 方法
    public int evaluate(String str) {
        if (null == str) {
            return 0;
        } else {
            return str.length();
        }
    }

//    public static void main(String[] args) {
//        StringLengthUdf stringLengthUdf = new StringLengthUdf();
//        String str = "test";
//        System.out.println("out:" + stringLengthUdf.evaluate(str));
//    }
}

UDF输入多个参数 StringContainUdf.java

代码语言:txt
复制
package com.sogo.sparkudf.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import java.io.Serializable;

public class StringContainUdf extends UDF implements Serializable {
    // 修改evaluate的形参,满足UDF不同输入参数及类型的场景
    public Boolean evaluate(String s1, String s2) {
        if (null == s1 || null == s2) {
            return false;
        } else return s1.contains(s2);
    }
}

2. 注册UDF

2.1 语法
2.1.1 通用语法
代码语言:txt
复制
CREATE [ OR REPLACE ] [ TEMPORARY ] FUNCTION [ IF NOT EXISTS ] function_name AS class_name [ resource_locations ]
2.1.2 基于jar的语法
代码语言:txt
复制
CREATE OR REPLACE FUNCTION strlen_udf_int  AS 'com.sogo.sparkudf.udf.StringLengthUdf'  USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';
2.2 注册

实验发现,在SparkSQL中注册的UDF需要在Hive客户端再次启动时生效;而在Hive中注册的UDF立即在SparkSQL中生效。

有时明明注册了UDF,客户端也重新连接了,但依然找不到UDF,可能是不在同一数据库,这点也需要重点关注下。</br>

2.2.1 查看已注册的functions
代码语言:txt
复制
# 查看已注册的function(hive、SparkSQL)
show functions;
## 查看已注册的UDF(SparkSQL)
show user functions;
2.2.2 在Hive中注册
代码语言:txt
复制
# 进入hive环境(若没有指定数据库,UDF将归当前数据库所有)
> hive
# 添加jar包
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
# 注册为临时UDF
CREATE TEMPORARY FUNCTION strlen_udf_int  AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 注册为永久UDF
CREATE FUNCTION strlen_udf_int  AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 更新永久UDF(这种方法在hive中不可用)
CREATE OR REPLACE FUNCTION strlen_udf_int  AS 'com.sogo.sparkudf.udf.StringLengthUdf';
# 不更新,类似追加的方式
CREATE FUNCTION IF NOT EXISTS strlen_udf_int  AS 'com.sogo.sparkudf.udf.StringLengthUdf';

每次添加显得麻烦,我们可以把将注册语句写入脚本,在进入hive前让它初始化。我们在配置SparkSQL时将这样做。

2.2.3 在SparkSQL中注册

在SparkSQL中,可以采用在Hive中注册的方法。下面采用初始化配置sql脚本的方式说明。</br>

.bashrc配置

代码语言:txt
复制
alias spark_sql="/opt/spark/bin/spark-sql \
    --master yarn \
    --deploy-mode client \
    --driver-memory 4G \
    --executor-memory 10G \
    --num-executors 80 \
    --executor-cores 4 \
    --name 'pyspark_cluster_lzx' \
    --queue adx_online \
    --database bigdata_lzx \
    --conf spark.dynamicAllocation.minExecutors=40 \
    --conf spark.dynamicAllocation.maxExecutors=80 \
    --conf spark.default.parallelism=1200 \
    --conf spark.sql.shuffle.partitions=1200 \
    --conf spark.eventLog.enabled=true \
    --conf spark.sql.autoBroadcastJoinThreshold=104857600\
    --hiveconf spark.hadoop.hive.cli.print.current.db=true \
    --hiveconf spark.hadoop.hive.cli.print.header=true \
    --hiveconf spark.hadoop.hive.resultset.use.unique.column.names=false\
    --jars file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar\
    -i /search/work/bigdata/liuzhixuan/sparkudf/spark_udf.sql"

注:--jars参数添加UDF的java实现到集群</br>

-i参数为预执行的代码</br>

spark_udf.sql

代码语言:txt
复制
CREATE OR REPLACE FUNCTION strlen_udf_int  AS 'com.sogo.sparkudf.udf.StringLengthUdf'  USING JAR 'file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar';

执行

代码语言:txt
复制
> spark_sql
> show user functions;

结果

代码语言:txt
复制
spark-sql (default)> show user functions;
function
bigdata_lzx.strlen_udf_int
Time taken: 0.549 seconds, Fetched 1 row(s)
spark-sql (default)> select strlen_udf_int("liu");
ADD JAR file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar
Added [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [file:///search/work/bigdata/liuzhixuan/sparkudf/jars/sparkudf-1.0-SNAPSHOT-jar-with-dependencies.jar]
bigdata_lzx.strlen_udf_int(liu)
3

参考文献:

1. CREATE FUNCTION   https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-create-function.html </br>

2. Hive UDF函数开发使用样例  https://sjq597.github.io/2015/11/25/Hive-UDF%E5%87%BD%E6%95%B0%E5%BC%80%E5%8F%91%E4%BD%BF%E7%94%A8%E6%A0%B7%E4%BE%8B/

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark UDF实现demo
    • 1 前言
      • 2 具体实现
        • 2.1 继承父类开发UDF
        • 2. 注册UDF
      • 参考文献:
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档