Spark UDF使用详解及代码示例

前言

本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。

  • 关于UDF:UDF:User Defined Function,用户自定义函数。

1、创建测试用DataFrame

下面以Spark2.x为例给出代码,关于Spark1.x创建DataFrame可在最后的完整代码里查看。

// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))

//创建测试df
val userDF = spark.createDataFrame(userData).toDF("name", "age")
userDF.show
+-----+---+
| name|age|
+-----+---+
|  Leo| 16|
|Marry| 21|
| Jack| 14|
|  Tom| 18|
+-----+---+
// 注册一张user表
userDF.createOrReplaceTempView("user")

2、Spark Sql用法

2.1 通过匿名函数注册UDF

下面的UDF的功能是计算某列的长度,该列的类型为String

2.1.1 注册

  • Spark2.x: spark.udf.register("strLen", (str: String) => str.length())
  • Spark1.x: sqlContext.udf.register("strLen", (str: String) => str.length())

2.2.2 使用

仅以Spark2.x为例

spark.sql("select name,strLen(name) as name_len from user").show
+-----+--------+
| name|name_len|
+-----+--------+
|  Leo|       3|
|Marry|       5|
| Jack|       4|
|  Tom|       3|
+-----+--------+

2.2 通过实名函数注册UDF

实名函数的注册有点不同,要在后面加 _(注意前面有个空格) 定义一个实名函数

/**
 * 根据年龄大小返回是否成年 成年:true,未成年:false
*/
def isAdult(age: Int) = {
  if (age < 18) {
    false
  } else {
    true
  }

}

注册(仅以Spark2.x为例)

spark.udf.register("isAdult", isAdult _)

至于使用都是一样的

2.3 关于spark.udf和sqlContext.udf

在Spark2.x里,两者实际最终都是调用的spark.udf sqlContext.udf源码

def udf: UDFRegistration = sparkSession.udf

可以看到调用的是sparkSession的udf,即spark.udf

3、DataFrame用法

DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法

3.1注册

import org.apache.spark.sql.functions._
//注册自定义函数(通过匿名函数)
val strLen = udf((str: String) => str.length())
//注册自定义函数(通过实名函数)
val udf_isAdult = udf(isAdult _)

3.2 使用

可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能

  • 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究//通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show

结果均为

+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
|  Leo| 16|       3|  false|
|Marry| 21|       5|   true|
| Jack| 14|       4|  false|
|  Tom| 18|       3|   true|
+-----+---+--------+-------+

3.3 withColumn和select的区别

可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。

  • 注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。/** * Returns a new Dataset by adding a column or replacing the existing column that has * the same name. * * @group untypedrel * @since 2.0.0 */ def withColumn(colName: String, col: Column): DataFrame = { val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { val columns = output.map { field => if (resolver(field.name, colName)) { col.as(colName) } else { Column(field) } } select(columns : _*) } else { select(Column("*"), col.as(colName)) } }

4、完整代码

下面的代码的功能是使用UDF给user表添加两列:name_len、isAdult,每个输出结果都是一样的

+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
|  Leo| 16|       3|  false|
|Marry| 21|       5|   true|
| Jack| 14|       4|  false|
|  Tom| 18|       3|   true|
+-----+---+--------+-------+

代码:

package com.dkl.leanring.spark.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
 * Spark Sql 用户自定义函数示例
 */
object UdfDemo {

  def main(args: Array[String]): Unit = {
    oldUdf
    newUdf
    newDfUdf
    oldDfUdf
  }

  /**
   * 根据年龄大小返回是否成年 成年:true,未成年:false
   */
  def isAdult(age: Int) = {
    if (age < 18) {
      false
    } else {
      true
    }

  }

  /**
   * 旧版本(Spark1.x)Spark Sql udf示例
   */
  def oldUdf() {

    //spark 初始化
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("oldUdf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
    //创建测试df
    val userDF = sc.parallelize(userData).toDF("name", "age")
    // 注册一张user表
    userDF.registerTempTable("user")

    // 注册自定义函数(通过匿名函数)
    sqlContext.udf.register("strLen", (str: String) => str.length())

    sqlContext.udf.register("isAdult", isAdult _)
    // 使用自定义函数
    sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show
    //关闭
    sc.stop()

  }

  /**
   * 新版本(Spark2.x)Spark Sql udf示例
   */
  def newUdf() {
    //spark初始化
    val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate()

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))

    //创建测试df
    val userDF = spark.createDataFrame(userData).toDF("name", "age")

    // 注册一张user表
    userDF.createOrReplaceTempView("user")

    //注册自定义函数(通过匿名函数)
    spark.udf.register("strLen", (str: String) => str.length())
    //注册自定义函数(通过实名函数)
    spark.udf.register("isAdult", isAdult _)
    spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show

    //关闭
    spark.stop()

  }

  /**
   * 新版本(Spark2.x)DataFrame udf示例
   */
  def newDfUdf() {
    val spark = SparkSession.builder().appName("newDfUdf").master("local").getOrCreate()

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))

    //创建测试df
    val userDF = spark.createDataFrame(userData).toDF("name", "age")
    import org.apache.spark.sql.functions._
    //注册自定义函数(通过匿名函数)
    val strLen = udf((str: String) => str.length())
    //注册自定义函数(通过实名函数)
    val udf_isAdult = udf(isAdult _)

    //通过withColumn添加列
    userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
    //通过select添加列
    userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show

    //关闭
    spark.stop()
  }
  /**
   * 旧版本(Spark1.x)DataFrame udf示例
   * 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的
   * 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
   */
  def oldDfUdf() {
    //spark 初始化
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("oldDfUdf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
    //创建测试df
    val userDF = sc.parallelize(userData).toDF("name", "age")
    import org.apache.spark.sql.functions._
    //注册自定义函数(通过匿名函数)
    val strLen = udf((str: String) => str.length())
    //注册自定义函数(通过实名函数)
    val udf_isAdult = udf(isAdult _)

    //通过withColumn添加列
    userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
    //通过select添加列
    userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show

    //关闭
    sc.stop()
  }

}

本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题:Spark UDF使用详解及代码示例

本文链接:https://dongkelun.com/2018/08/02/sparkUDF/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏一英里广度一英寸深度的学习

SparkSQL 电影评价数据分析

Dataset调用createOrReplaceTempView生成临时表,session内有效。 spark.sql执行sqll操作,可以选择创建的临时表。

1803
来自专栏xingoo, 一个梦想做发明家的程序员

Spark MLlib 之 aggregate和treeAggregate从原理到应用

由于treeAggregate是在aggregate基础上的优化版本,因此先来看看aggregate是什么.

1290
来自专栏Java 技术分享

Ajax 案例之三级联动

3196
来自专栏别先生

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

一:序列化概念 序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程。即把字节流转回...

25910
来自专栏码匠的流水账

聊聊rocketmq的PushConsumerImpl

io/openmessaging/rocketmq/consumer/PushConsumerImpl.java

1672
来自专栏分布式系统进阶

Influxdb中Select查询请求结果涉及到的一些数据结构

相当于c里面的链表元素,itr指向下一个元素的指针,buf表示当前元素,即FloatPoint类型的链表的迭代器.

1492
来自专栏拂晓风起

jQuery 和 json 简单例子(注意callback函数的处理!!) (servlet返回json,jquery更新,java json)

1153
来自专栏码匠的流水账

聊聊storm的CheckpointSpout

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java

2336
来自专栏用户画像

jQuery validate

681
来自专栏王磊的博客

javascript数字格式化通用类——accounting.js使用

简介 accounting.js 是一个非常小的JavaScript方法库用于对数字,金额和货币进行格式化。并提供可选的Excel风格列渲染。它没有依赖任何JS...

5446

扫码关注云+社区

领取腾讯云代金券