前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 Spark | 手把手带你十步轻松拿下 Spark SQL 使用操作

使用 Spark | 手把手带你十步轻松拿下 Spark SQL 使用操作

作者头像
数人之道
发布2022-03-28 14:58:39
8.2K0
发布2022-03-28 14:58:39
举报
文章被收录于专栏:数据技术数据技术

文章大纲

在《20张图详解 Spark SQL 运行原理及数据抽象》的第 5 节“SparkSession”中,我们知道了 Spark SQL 就是基于 SparkSession 作为入口实现的。

在 Spark 2.0 版本之后,SparkSession 封装了 SQLContext 及 HiveContext,实现了后两者的所有功能,并可以获取到 SparkConetxt。

那 Spark SQL 具体的实现方式是怎样的?如何进行使用呢?

下面就带大家一起来认识 Spark SQL 的使用方式,并通过十步操作实战,轻松拿下 Spark SQL 的使用。

1

DataSet 及 DataFrame 的创建

在《20张图详解 Spark SQL 运行原理及数据抽象》的第 4 节“Spark SQL 数据抽象”中,我们认识了 Spark SQL 中的两种数据抽象:DataSet 及 DataFrame。

而在《带你理解 Spark 中的核心抽象概念:RDD》的 2.1 节中,我们认识了如何在 Spark 中创建 RDD,那 DataSet 及 DataFrame 在 Spark SQL 中又是如何进行创建的呢?

DataSet 及 DataFrame 的创建方式有两种:

1.1

使用 Spark 创建函数进行创建

手动定义数据集合,然后通过 Spark 的创建操作函数 createDataset()createDataFrame(), 创建 DataSet、DataFrame:

DataSet:

代码语言:javascript
复制
//DataSet
case class Person(name:String, age:Int, height:Int)
val seq1 = Seq(Person("Michael", 25, 176), Person("Jack", 15, 165))
val ds1 = spark.createDataset(seq1)
ds1.show

使用 Spark 创建操作函数创建 DataSet

DataFrame:

代码语言:javascript
复制
//DataFrame
val seq2 = Seq(("Michael", 25, 176), ("Jack", 15, 165))
val df1 = spark.createDataFrame(seq2).toDF("name", "age", "height")
df1.show

使用 Spark 创建操作函数创建 DataFrame

由于这种方式需要手动定义数据,实际操作中并不常用。

1.2

读取数据源进行创建

Spark SQL 支持的数据源包括:文件、数据库、Hive 等。

1.2.1. 读取文件数据源

Spark SQL 支持的文件类型包括:parquet、text、csv、json、orc 等。

例如读取 Spark 自带的 text 文件:

代码语言:javascript
复制
val sc = spark.sparkContext
val textRDD1 = sc.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt")
textRDD1.take(5)

Spark SQL 读取文件数据源方式一

或:

代码语言:javascript
复制
val textRDD2 = spark.read.text("file:///opt/modules/spark/examples/src/main/resources/people.txt").rdd
textRDD2.take(5)

Spark SQL 读取文件数据源方式二

两种用法的区别在于返回的数据集类型不一样

  • sc.textFile(path:String) 返回的数据集类型是:RDD[String]
  • spark.read.text(path:String) 返回的数据集类型是:DataFrame(DataSet[Row])

1.2.2. 读取数据库数据源

Spark SQL 支持通过 JDBC 读取外部数据库的数据作为数据源。

以读取 Oracle 数据库为例:

启动 Spark Shell 时,指定 Oracle 数据库的驱动:

代码语言:javascript
复制
spark-shell --master spark://hadoop101:7077 \
--jars /root/temp/ojdbc6.jar \
--driver-class-path /root/temp/ojdbc6.jar

连接数据库,以读取数据库中的数据:

代码语言:javascript
复制
val oracleDF = spark.read.format("jdbc").
option("url","jdbc:oracle:thin:@192.168.100.1:1521/orcl.example.com").
option("dbtable","scott.emp").
option("user","scott").
option("password","test").
load

1.2.3. 使用 Hive 中的数据

Spark SQL 是由 Shark 发展而来的,Shark 其实就是 Hive on Spark。Spark 1.0 版本发布后,才引入了 Spark SQL。

2014 年 7 月 1 日之后,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上。

Spark SQL 的具体发展史详见下图:

Spark SQL 发展历史

可见,Spark 原生就对 Hive 的兼容十分友好,且其还内置了 Hive 组件,Spark SQL 可以通过内置 Hive 或者外部 Hive 两种方式读取 Hive 库中的数据。

Spark SQL 具体使用和操作 Hive 数据源的方法将在后续的 Hive 专栏中进行介绍。

2

RDD、DataFrame、DataSet 的共性与转换

在 Spark 中,RDD、DataFrame、DataSet 三种类型的数据集是有一定的共同特性的,因此它们三者之间可以相互进行转换,而且需要转换的场景也较为常见。

2.1

RDD、DataFrame、DataSet 的共性

  1. RDD、DataFrame、DataSet 都是 Spark 平台下的分布式弹性数据集,为处理超大型数据提供了便利;
  2. 三者都有惰性计算机制,在进行创建、Transformation 操作时,不会立即执行,只有在遇到 Action 操作时,才会开始遍历运算(详细介绍请参见《带你理解 Spark 中的核心抽象概念:RDD》中的第 2 节“RDD 的操作”);
  3. 三者都有 Partition 的概念,可以进行 Cache(缓存)操作,也可以进行 CheckPoint(检查点)操作(详细介绍请参见《7000字+15张图解,学习 Spark 入门基础知识》中的 4.3.4 节及 2.3 节);
  4. 三者都有许多相似的操作算子,如 map、filter、groupByKey 等(详细介绍请参见《带你理解 Spark 中的核心抽象概念:RDD》中的 2.3 节“RDD API 算子”);
  5. 在对 DataFrame 和 Dataset 进行操作时,很多情况下需要 spark.implicits._ 进行支持。

2.2

RDD、DataFrame、DataSet 的转换

RDD、DataFrame、DataSet 之间的转换

2.2.1. DataFrame/DataSet 转 RDD

这个转换比较简单,直接调用 rdd 即可将 DataFrame/DataSet 转换为 RDD:

代码语言:javascript
复制
val rdd1 = testDF.rdd
val rdd2 = testDS.rdd

2.2.2. RDD 转 DataFrame

a. 通过编程的方式来设置 Schema,适用于编译器不能确定列的情况:

代码语言:javascript
复制
val peopleRDD = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt")
val schemaString = "name age"
val filed = schemaString.split(" ").map(filename => org.apache.spark.sql.types.StructField(filename, org.apache.spark.sql.types.StringType, nullable = true))
val schema = org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(",")).map(para => org.apache.spark.sql.Row(para(0).trim, para(1).trim))
val peopleDF = spark.createDataFrame(res6, schema)
peopleDF.show

b. 用元组把一行的数据写在一起,然后在 toDF() 中指定字段名:

代码语言:javascript
复制
val peopleDF2 = rdd.map(para(para(0).trim(), para(1).trim().toInt)).toDF("name", "age")
peopleDF2.show

c. 定义 case class,通过反射来设置 Schema,使用 toDF 进行转换:

代码语言:javascript
复制
case class Person(name:String, age:Int)
val peopleDF3 = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(para => Person(para(0).trim, para(1).trim.toInt)).toDF
peopleDF3.show

RDD 转 DataFrame(case class 方式)

2.2.3. RDD 转 DataSet

定义 case class,通过反射来设置 Schema,使用 toDS 进行转换:

代码语言:javascript
复制
case class Person(name:String, age:Int)
val peopleDS = spark.sparkContext.textFile("file:///opt/modules/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(para => Person(para(0).trim, para(1).trim.toInt)).toDS
peopleDS.show

RDD 转 DataSet

2.2.4. DataSet 转 DataFrame

直接调用 toDF,即可将 DataSet 转换为 DataFrame:

代码语言:javascript
复制
val peopleDF4 = peopleDS.toDF
peopleDF4.show

DataSet 转 DataFrame

2.2.5. DataFrame 转 DataSet

使用 as 方法,as 方法后面跟的是 case class:

代码语言:javascript
复制
val peopleDS2 = peopleDF3.as[Person]
peopleDS2.show

DataFrame 转 DataSet

DataFrame 与 DataSet 均支持 Spark SQL 的算子操作,同时也能进行 SQL 语句操作,下面的实战中会进行演示。

3

Spark SQL 查询方式

Spark SQL 支持两种查询方式:一种是DSL 风格,另外一种是SQL 风格。

3.1

DSL 风格

Spark SQL 提供了一种 DSL(Domain Specified Language,领域专用语言,在语义上与 SQL 关系查询非常相近),以方便操作结构化数据。

使用前需要引入 spark.implicits._ 这个隐式转换,以将 DataFrame 隐式转换成 RDD。

3.2

SQL 风格

Spark SQL 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用 spark.sql() 来执行 SQL 查询,并返回结果数据集。

使用前需要将 DataFrame/DataSet 注册成一张表,注册方式分两种:

1. Local Temporary View

使用 createOrReplaceTempView()createTempView()方法可以将表注册成 Local Temporary View(局部临时视图),这种方式注册的表只对当前生命周期中的 Session 有效,不能与其它 Session 共享。

2. Global Temporary View

使用 createGlobalTempView() 方法可以将表注册成 Global Temporary View(全局临时视图),这种方式注册的表可以在不同的 Session 中共享,即跨 Session 有效,而且在 Application 的运行周期内可用。

需要注意的是,使用 SQL 语句访问该表时,要加上 global_temp 作为前缀来引用,因为全局临时视图是绑定到系统保留的数据库 global_temp 上的。

下面的实战中会有注册不同类型表区别的实例操作演示。

3.3

Spark SQL 算子

DSL 支持 Spark SQL 算子,且算子十分丰富,下面列举部分算子:

3.3.1. select 相关

a. 列的数据展示有多种表示方法:""$""'col()df(""),注意不要混合使用:

代码语言:javascript
复制
// select
df1.select($"ename", $"age", $"sal").show
df1.select("ename", "age", "sal").show
df1.select('ename, 'age, 'sal).show
df1.select(col("ename"), col("age"), col("sal")).show
df1.select(df1("ename"), df1("age"), df1("sal")).show

b. expr 表达式可以对列进行操作,注意 expr 里面只能使用引号:

代码语言:javascript
复制
// expr表达式
df1.select(expr("age + 1"), expr("sal + 100"), expr("ename")).show
df1.selectExpr("ename as name").show
df1.selectExpr("round(sal, -3) as newsal", "sal", "ename").show

3.3.2. 更改相关

a. drop 可删除一个或多个列,得到新的 DataFrame:

代码语言:javascript
复制
// drop
df1.drop("age").show
df1.drop("age", "sal").show

b. withColumn 可对列值进行更改:

代码语言:javascript
复制
// withColumn
df1.withColumn("sal", $"sal" + 100).show

c. withColumnRenamed 可对列名进行更改:

代码语言:javascript
复制
// withColumnRenamed
df1.withColumnRenamed("sal", "newsal").show

注意:以上操作后,返回的数据集的类型是 DataFrame。

3.3.3. 筛选过滤相关

筛选、过滤的操作可以使用 filterwhere 算子:

代码语言:javascript
复制
// filter
df1.filter("sal > 10000").show
df1.filter("sal > 10000 and job == 'MANAGER'").show

// where
df1.where("sal > 10000").show
df1.where("sal > 10000 and job == 'MANAGER'").show

3.3.4. 聚集统计相关

使用 groupBy 算子搭配统计方式或 agg 可进行数据统计操作:

代码语言:javascript
复制
// groupBy with sum, min, max, avg, count
df1.groupBy("age").sum("sal").show
df1.groupBy("age").min("sal").show
df1.groupBy("age").max("sal").show
df1.groupBy("age").avg("sal").show
df1.groupBy("age").count.show

// agg
df1.groupBy("age").agg("sal" -> "sum", "sal" -> "min", "sal" -> "max", "sal" -> "avg", "sal" -> "count").show
df1.groupBy("age").agg(sum("sal"), min("sal"), max("sal"), avg("sal"), count("sal")).show
df1.groupBy("age").agg(sum("sal").as("sum1"), min("sal").as("min2"), max("sal").as("max3"), avg("sal").as("avg4"), count("sal").as("count5")).show

3.3.5. 排序相关

使用 orderBysort 算子可进行排序操作:

代码语言:javascript
复制
// orderBy
df1.orderBy("sal").show
df1.orderBy($"sal").show
df1.orderBy($"sal".asc).show
// 降序
df1.orderBy($"sal".desc).show
df1.orderBy(-'sal).show
df1.orderBy(-'age, -'sal).show

// sort
df1.sort("sal").show
df1.sort($"sal").show
df1.sort($"sal".asc).show
df1.sort($"sal".desc).show
df1.sort(-'sal).show
df1.sort(-'age, -'sal).show

3.3.6. 集合(并、交、差)相关

使用 union(unionAll)、intersectexcept 算子可对数据进行并集、交集、差集操作:

代码语言:javascript
复制
// union, unionAll, intersect, except
val ds3 = ds1.select("ename")
val ds4 = ds2.select("ename")

// union(求并集,不去重)
ds3.union(ds4).show

// unionAll(求并集,去重,过期方法)
ds3.unionAll(ds4).show

// intersect(求交集)
ds3.intersect(ds4).show

// except(求差集)
ds3.except(ds4).show

3.3.7. 连接相关

与 SQL 类似,连接类型有:内连接、左(外)连接、右(外)连接、全(外)连接、半连接、反连接、笛卡尔积等:

代码语言:javascript
复制
// join
// inner join(内连接)
ds1.join(ds2, "empno").show
ds1.join(ds2, Seq("empno"), "inner").show

// left join(左连接), left outer join(左外连接)
ds1.join(ds2, Seq("empno"), "left").show
ds1.join(ds2, Seq("empno"), "left_outer").show

// right join(右连接), right outer join(右外连接)
ds1.join(ds2, Seq("empno"), "right").show
ds1.join(ds2, Seq("empno"), "right_outer").show

// outer join(外连接), full join(全连接), full outer join(全外连接)
ds1.join(ds2, Seq("empno"), "outer").show
ds1.join(ds2, Seq("empno"), "full").show
ds1.join(ds2, Seq("empno"), "full_outer").show

//semi join(半连接), anti join(反连接)
ds1.join(ds2, Seq("empno"), "left_semi").show
ds1.join(ds2, Seq("empno"), "left_anti").show

注意:跟更改相关的算子一样,连接操作后,返回的数据集的类型是 DataFrame。

4

Spark SQL 使用实战

有了上面及之前介绍的理论知识为基础,下面手把手带大家十步轻松拿下 Spark SQL 使用操作,用实战的形式实践学习到的理论知识,以加深对 Spark SQL 的印象与理解。

4.1

创建数据源文件

这里使用《如何快速获取并分析自己所在城市的房价行情?》中获取到的广州二手房 csv 格式的数据作为数据源文件。

数据源文件(广州二手房信息)

另外再创建一个户型信息相关的数据源文件,以进行连接操作使用。

数据源文件(户型信息)

注意数据文件的编码格式要采用中文编码,否则中文会显示乱码。

4.2

上传数据源文件至 HDFS

这里使用《万字+50图,详解 Hadoop HA 完全分布式部署配置及运行调试》中搭建的 Hadoop 中的 HDFS 作为数据文件的存储系统,因此需要将创建的数据源文件上传至 HDFS 中,供 Spark SQL 进行读取。

上传数据源文件至 HDFS:

代码语言:javascript
复制
hdfs dfs -put /opt/data/ershouHousePrice_lianjia_gz_hdfs.csv /input
hdfs dfs -put /opt/data/huxing_lianjia_gz_hdfs.csv /input

打开 HDFS 的 Web 页面查看:

通过 HDFS Web 页面查看上传数据文件是否成功

可以看到,两个数据源文件已经成功上传至 HDFS 中。

4.3

定义 case class(表的 schema)

打开 SparkSession,定义 case class,即表的 Schema 信息:

代码语言:javascript
复制
case class House(totalprice:Float, positioninfo:String, huxing:String, chaoxiang:String, zhuangxiu:String, louceng:String, louling:String, louxing:String, danjia:Int, mianji:Float, guanzhu:Int)

定义 case class(House)

这里按照数据文件中的字段名称及对应的数据类型,对 Schema 进行定义。

4.4

读取数据源,加载数据(RDD 转 DataFrame)

读取上传到 HDFS 中的广州二手房信息数据文件,分隔符为逗号,将数据加载到上面定义的 Schema 中,并转换为 DataFrame 数据集:

代码语言:javascript
复制
val houseDF = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/ershouHousePrice_lianjia_gz_hdfs.csv").map(_.split(",")).map(para => House(para(0).trim.toFloat, para(1).trim, para(2).trim, para(3).trim, para(4).trim, para(5).trim, para(6).trim, para(7).trim, para(8).trim.toInt, para(9).trim.toFloat, para(10).trim.toInt)).toDF
houseDF.show

读取并加载数据源文件

展示加载的数据集结果

由于数据加载到 Schema 中为 RDD 数据集,需要用 toDF 转换为 DataFrame 数据集,以使用 Spark SQL 进行查询。

4.5

使用 DSL 风格查询数据

使用 Spark SQL 的 DSL 风格查询方式,对 houseDF 数据集进行查询,包括 select、筛选过滤、聚集统计:

代码语言:javascript
复制
houseDF.select("positioninfo").show
houseDF.filter($"totalprice" > 1000).show
houseDF.groupBy($"huxing").count.show

DSL 风格 - 使用 select 算子

DSL 风格 - 使用筛选过滤算子

DSL 风格 - 使用聚集统计算子

大家还可以尝试使用上面介绍的其它 Spark SQL 算子进行查询。

4.6

注册表

houseDF 数据集注册两种不同类型的表:Local Temporary View、Global Temporary View:

代码语言:javascript
复制
houseDF.createOrReplaceTempView("houseDF")
houseDF.createGlobalTempView("houseDF_gl")

下面对这两种类型的表进行查询,观察两者之间的区别。

4.7

使用 SQL 风格查询数据

使用 Spark SQL 的 SQL 风格查询方式,对上面注册的两种不同类型表进行查询:

代码语言:javascript
复制
spark.sql("select * from houseDF").show

SQL 风格 - 查询 Local Temporary View

代码语言:javascript
复制
spark.sql("select * from global_temp.houseDF_gl").show

SQL 风格 - 查询 Global Temporary View

注意查询 Global Temporary View 类型表时,需要加上 global_temp 前缀。

在不同的 Session 中,对上面注册的两种表进行查询:

代码语言:javascript
复制
spark.newSession.sql("select * from houseDF").show

在新的 Session 中查询 Local Temporary View

代码语言:javascript
复制
spark.newSession.sql("select * from global_temp.houseDF_gl").show

在新的 Session 中查询 Global Temporary View

通过操作实践,可以看到:

Local Temporary View 只对当前 Session 有效;而 Global Temporary View 可以在不同 Session 间共享,支持跨 Session 查询。

4.8

DataFrame 转 DataSet

将 DataFrame 数据集 houseDF 转换成 DataSet 数据集 houseDS

代码语言:javascript
复制
val houseDS = houseDF.as[House]
houseDS.show

DataFrame 转 DataSet 实战

使用 DSL 风格查询方式,对 houseDS 数据集进行查询操作:

代码语言:javascript
复制
houseDS.filter(_.totalprice > 1000).show
houseDS.filter(_.huxing == "3室2厅").show
houseDS.groupBy($"huxing").count.show

对 DataSet 进行 DSL 风格查询

houseDS 数据集转换成 Array 类型结构数据:

代码语言:javascript
复制
houseDS.collect

对 DataSet 转换为 Array 类型结构数据

可见,DataFrame 转换为 DataSet 后,同样支持 Spark SQL 的算子操作。

4.

RDD 转 DataSet

重新读取并加载广州二手房信息数据源文件,将其转换为 DataSet 数据集:

代码语言:javascript
复制
val houseRdd = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/ershouHousePrice_lianjia_gz_hdfs.csv").map(_.split(","))
val houseDS2 = houseRdd.map(para => House(para(0).trim.toFloat,para(1).trim,para(2).trim,para(3).trim,para(4).trim,para(5).trim,para(6).trim,para(7).trim,para(8).trim.toInt,para(9).trim.toFloat,para(10).trim.toInt)).toDS
houseDS2.show

RDD 转 DataSet 实战

houseDS2 数据集注册成表,并使用 SQL 风格查询方式进行查询:

代码语言:javascript
复制
houseDS2.createOrReplaceTempView("houseDS2")
spark.sql("select * from houseDS2").show

注册表并进行 SQL 风格查询

代码语言:javascript
复制
spark.sql("select totalprice, positioninfo, huxing, danjia, mianji from houseDS2 where totalprice > 1000 and mianji < 150 order by mianji").show

对 DataSet 进行 SQL 风格查询

SQL 风格查询方式更适合进行复杂的数据查询。

4.10

使用 SQL 风格进行连接查询

读取上传到 HDFS 中的户型信息数据文件,分隔符为逗号,将数据加载到定义的 Schema 中,并转换为 DataSet 数据集:

代码语言:javascript
复制
case class Huxing(huxing:String, rooms:String)
val huxingRdd = spark.sparkContext.textFile("hdfs://hadoop100:8020/input/huxing_lianjia_gz_hdfs.csv").map(_.split(","))
val huxingDS = huxingRdd.map(para => Huxing(para(0).trim, para(1).trim)).toDS
huxingDS.show

加载户型信息数据源文件,并转换为 DataSet

huxingDS 数据集注册成表,并使用 SQL 风格查询方式进行查询:

代码语言:javascript
复制
huxingDS.createOrReplaceTempView("huxingDS")
spark.sql("select * from huxingDS").show

注册表并进行 SQL 风格查询

houseDS2huxingDS 两个 DataSet 数据集采用 SQL 风格查询方式进行连接查询,统计所有二房和三房房子的总价格:

代码语言:javascript
复制
spark.sql("select sum(totalprice) from (select houseDS2.totalprice, huxingDS.rooms from houseDS2 join huxingDS where houseDS2.huxing = huxingDS.huxing and huxingDS.rooms in ('二房','三房')) t").show

使用 SQL 风格进行连接查询

至此,Spark SQL 的使用操作实战暂告一段落,大家可以继续深入摸索研究,发掘 Spark SQL 的精髓所在!

版权信息:© Evgeny Vasenev / Aurora Photos

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-03-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数人之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档