首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(三十):SparkSQL自定义UDF函数


自定义UDF函数

     无论Hive还是SparkSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在org.apache.spark.sql.functions中。SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。

回顾Hive中自定义函数有三种类型:

第一种:UDF(User-Defined-Function) 函数

一对一的关系,输入一个值经过函数以后输出一个值;

在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

第二种:UDAF(User-Defined Aggregation Function) 聚合函数

多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

第三种:UDTF(User-Defined Table-Generating Functions) 函数

一对多的关系,输入一个值输出多个值(一行变为多行);

用户自定义生成函数,有点像flatMap;

注意

目前来说Spark 框架各个版本及各种语言对自定义函数的支持:

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数:

UDF函数:一对一关系;

UDAF函数:聚合函数,通常与group by 分组函数连用,多对一关系;

由于SparkSQL数据分析有两种方式:DSL编程和SQL编程,所以定义UDF函数也有两种方式,不同方式可以在不同分析中使用。

SQL方式

     使用SparkSession中udf方法定义和注册函数,在SQL中使用,使用如下方式定义:

DSL方式

    使用org.apache.sql.functions.udf函数定义和注册函数,在DSL中使用,如下方式:

代码演示

代码语言:javascript
复制
package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * Author itcast
 * Desc
 * 将udf.txt中的单词使用SparkSQL自定义函数转为大写
 * hello
 * haha
 * hehe
 * xixi
 */
object UDFDemo {
  def main(args: Array[String]): Unit = {
    //1.准备环境
    val spark: SparkSession = SparkSession.builder().appName("UDFDemo").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._

    //2.加载数据
    val df: DataFrame = spark.read.text("data/input/udf.txt")
    df.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    | haha|
    | hehe|
    | xixi|
    +-----+
     */

    //3.使用自定义函数将单词转为大写

    //SQL风格-自定义函数
    //spark.udf.register("函数名",函数实现)
    spark.udf.register("small2big", (value: String) => value.toUpperCase())

    df.createOrReplaceTempView("t_words")
    val sql: String =
      """
        |select value,small2big(value) as small_word
        |from t_words
        |""".stripMargin
    spark.sql(sql).show() //如果没有自定义该函数,那么会报错:Undefined function: 'small2big'.

    //DSL风格-自定义函数
    //val small2big2: UserDefinedFunction = functions.udf((value: String) => value.toUpperCase)
    import org.apache.spark.sql.functions._
    val small2big2: UserDefinedFunction = udf((value: String) => value.toUpperCase)
    df.select($"value", small2big2($"value").as("small_word")).show()
  }
}
下一篇
举报
领券