Spark实战--学习UDF

在开始正式数据处理之前,我觉得有必要去学习理解下UDF。

UDF

UDF全称User-Defined Functions,用户自定义函数,是Spark SQL的一项功能,用于定义新的基于列的函数,这些函数扩展了Spark SQL的DSL用于转换数据集的词汇表。

我在databricks上找到一个比较简单理解的入门栗子:

Register the function as a UDF

1val squared = (s: Int) => {
2  s * s
3}
4spark.udf.register("square", squared)

Call the UDF in Spark SQL

1spark.range(1, 20).registerTempTable("test")
2%sql select id, square(id) as id_squared from test

我理解就是先定义一个函数squared,返回输入数字的平方,然后register,并绑定square方法名为square,然后就在Spark SQL中直接使用square方法。

实例一:温度转化

 1import org.apache.spark.sql.SparkSession
 2import org.apache.spark.SparkConf
 3
 4object ScalaUDFExample {
 5  def main(args: Array[String]) {
 6    val conf       = new SparkConf().setAppName("Scala UDF Example")
 7    val spark      = SparkSession.builder().enableHiveSupport().config(conf).getOrCreate() 
 8
 9    val ds = spark.read.json("temperatures.json")
10    ds.createOrReplaceTempView("citytemps")
11
12    // Register the UDF with our SparkSession 
13    spark.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0))
14
15    spark.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
16  }
17}

我们将定义一个 UDF 来将以下 JSON 数据中的温度从摄氏度(degrees Celsius)转换为华氏度(degrees Fahrenheit):

1{"city":"St. John's","avgHigh":8.7,"avgLow":0.6}
2{"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9}
3{"city":"Halifax","avgHigh":11.0,"avgLow":1.6}
4{"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5}
5{"city":"Quebec","avgHigh":9.0,"avgLow":-1.0}
6{"city":"Montreal","avgHigh":11.1,"avgLow":1.4}
7...

实例二:时间转化

 1case class Purchase(customer_id: Int, purchase_id: Int, date: String, time: String, tz: String, amount:Double)
 2
 3val x = sc.parallelize(Array(
 4  Purchase(123, 234, "2007-12-12", "20:50", "UTC", 500.99),
 5  Purchase(123, 247, "2007-12-12", "15:30", "PST", 300.22),
 6  Purchase(189, 254, "2007-12-13", "00:50", "EST", 122.19),
 7  Purchase(187, 299, "2007-12-12", "07:30", "UTC", 524.37)
 8))
 9
10val df = sqlContext.createDataFrame(x)
11df.registerTempTable("df")

自定义函数

1def makeDT(date: String, time: String, tz: String) = s"$date $time $tz"
2sqlContext.udf.register("makeDt", makeDT(_:String,_:String,_:String))
3
4// Now we can use our function directly in SparkSQL.
5sqlContext.sql("SELECT amount, makeDt(date, time, tz) from df").take(2)
6// but not outside
7df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2) // fails

如果想要在SQL外面使用,必须通过spark.sql.function.udf来创建UDF

1import org.apache.spark.sql.functions.udf
2val makeDt = udf(makeDT(_:String,_:String,_:String))
3// now this works
4df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2)

实践操作

写一个UDF来将一些Int数字分类

 1val formatDistribution = (view: Int) => {
 2  if (view < 10) {
 3    "<10"
 4  } else if (view <= 100) {
 5    "10~100"
 6  } else if (view <= 1000) {
 7    "100~1K"
 8  } else if (view <= 10000) {
 9    "1K~10K"
10  } else if (view <= 100000) {
11    "10K~100K"
12  } else {
13    ">100K"
14  }
15}

注册:

1session.udf.register("formatDistribution", UDF.formatDistribution)

SQL:

1session.sql("select user_id, formatDistribution(variance_digg_count) as variance from video")

写到这里,再回顾UDF,我感觉这就像是去为了方便做一个分类转化等操作,和Python里面的函数一样,只不过这里的UDF一般特指Spark SQL里面使用的函数。然后发现这里和SQL中的自定义函数挺像的:

 1CREATE FUNCTION [函数所有者.]<函数名称> 
 2(   
 3    -- 添加函数所需的参数,可以没有参数
 4    [<@param1> <参数类型>]
 5    [,<@param1> <参数类型>]…
 6)
 7RETURNS TABLE 
 8AS
 9RETURN 
10(
11    -- 查询返回的SQL语句
12    SELECT查询语句
13)
 1/*
 2* 创建内联表值函数,查询交易总额大于1W的开户人个人信息
 3*/
 4create function getCustInfo()
 5returns @CustInfo table  --返回table类型
 6(
 7    --账户ID
 8    CustID int,
 9    --帐户名称
10    CustName varchar(20) not null,
11    --身份证号
12    IDCard varchar(18),
13    --电话
14    TelePhone varchar(13) not null,
15    --地址
16    Address varchar(50) default('地址不详')
17)
18as
19begin
20    --为table表赋值
21    insert into @CustInfo
22    select CustID,CustName,IDCard,TelePhone,Address from AccountInfo 
23    where CustID in (select CustID from CardInfo 
24    where CardID in (select CardID from TransInfo group by CardID,transID,TransType,TransMoney,TransDate having sum(TransMoney)>10000))
25    return
26end
27go
28-- 调用内联表值函数
29select * from getCustInfo()
30go

好像有异曲同工之妙~

原文发布于微信公众号 - Python爬虫与算法进阶(zhangslob)

原文发表时间:2018-10-29

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏乐沙弥的世界

PL/SQL 集合的方法

    PL/SQL中提供了常用的三种集合联合数组、嵌套表、变长数组,而对于这几个集合类型中元素的操作,PL/SQL提供了相应的函数或过程来操 纵数组中的元素...

8630
来自专栏跟着阿笨一起玩NET

sql server 获取每一个类别中值最大的一条数据

SELECT  * FROM    (           SELECT    * ,                     ROW_NUMBER() OVE...

39910
来自专栏机器学习入门

LWC 56:718. Maximum Length of Repeated Subarray

LWC 56:718. Maximum Length of Repeated Subarray 传送门:718. Maximum Length of Repea...

21960
来自专栏北京马哥教育

SQL函数汇总【精选篇】

1.绝对值 SQL:select abs(-1) value O:select abs(-1) value from dual 2.取整(大) ...

32890
来自专栏芋道源码1024

数据库中间件 Sharding-JDBC 源码分析 —— SQL 解析(一)之语法解析

1. 概述 2. Lexer 词法解析器 3. Token 词法标记 3.2.1 Literals.IDENTIFIER 词法关键词 3.2.2 Litera...

45980
来自专栏chenssy

【死磕Sharding-jdbc】---SQL解析-词法分析

sharding-jdbc对SQL解析的源码主要在下图所示parsing模块中,由下图可知SQL解析主要分为两部分:lexer和parser。lexer就是本文...

17220
来自专栏idba

order by 原理以及优化

一 简介 偏向于业务的(MySQL)DBA或者业务的开发者来说,order by 排序是一个常见的业务功能,将结果根据指定的字段排序,满足前端展示的需求。然而...

13730
来自专栏desperate633

第14课 组合查询创建组合查询union的使用规则

组合查询很容易理解就是讲多个查询的结果放在一起显示 使用UNION关键字进行查询的组合

8320
来自专栏JetpropelledSnake

Django学习笔记之Models与ORM操作

12160
来自专栏Java帮帮-微信公众号-技术文章全总结

Oracle应用实战六——函数+集合

函数 字符函数 接收字符输入返回字符或者数值,dual是伪表 1. 把小写的字符转换成大小的字符 upper('smith') ? 2. 把大写字符变成小写字符...

31050

扫码关注云+社区

领取腾讯云代金券