导读
昨日推文PySpark环境搭建和简介,今天开始介绍PySpark中的第一个重要组件SQL/DataFrame,实际上从名字便可看出这是关系型数据库SQL和pandas.DataFrame的结合体,功能也几乎恰是这样,所以如果具有良好的SQL基本功和熟练的pandas运用技巧,学习PySpark SQL会感到非常熟悉和舒适。
惯例开局一张图
01 PySpark SQL简介
前文提到,Spark是大数据生态圈中的一个快速分布式计算引擎,支持多种应用场景。例如Spark core中的RDD是最为核心的数据抽象,定位是替代传统的MapReduce计算框架;SQL是基于RDD的一个新的组件,集成了关系型数据库和数仓的主要功能,基本数据抽象是DataFrame,与pandas.DataFrame极为相近,适用于体量中等的数据查询和处理。
那么,在已经有了RDD的基础上,Spark为什么还要推出SQL呢?为此,Spark团队还专门为此发表论文做以介绍,原文可查找《Spark SQL: Relational Data Processing in Spark》一文。这里只节选其中的关键一段:
核心有两层意思,一是为了解决用户从多种数据源(包括结构化、半结构化和非结构化数据)执行数据ETL的需要;二是满足更为高级的数据分析需求,例如机器学习、图处理等。而为了实现这一目的,Spark团队推出SQL组件,一方面满足了多种数据源的处理问题,另一方面也为机器学习提供了全新的数据结构DataFrame(对应ml子模块)。
了解了Spark SQL的起源,那么其功能定位自然也十分清晰:基于DataFrame这一核心数据结构,提供类似数据库和数仓的核心功能,贯穿大部分数据处理流程:从ETL到数据处理到数据挖掘(机器学习)。
注:由于Spark是基于scala语言实现,所以PySpark在变量和函数命名中也普遍采用驼峰命名法(首单词小写,后面单次首字母大写,例如someFunction),而非Python中的蛇形命名(各单词均小写,由下划线连接,例如some_funciton)
02 几个重要的类
为了支撑上述功能需求和定位,PySpark中核心的类主要包括以下几个:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)
注:这里的Window为单独的类,用于建立窗口函数over中的对象;functions子模块中还有window函数,其主要用于对时间类型数据完成重采样操作。
03 DataFrame
DataFrame是PySpark中核心的数据抽象和定义,理解DataFrame的最佳方式是从以下2个方面:
换言之,记忆PySpark中的DataFrame只需对比SQL+pd.DataFrame即可。下面对DataFrame对象的主要功能进行介绍:
2)数据写入。与spark.read属性类似,.write则可用于将DataFrame对象写入相应文件,包括写入csv文件、写入数据库等
3)数据类型转换。DataFrame既然可以通过其他类型数据结构创建,那么自然也可转换为相应类型,常用的转换其实主要还是DataFrame=>rdd和DataFrame=>pd.DataFrame,前者通过属性可直接访问,后者则需相应接口:
df.rdd # PySpark SQL DataFrame => RDD
df.toPandas() # PySpark SQL DataFrame => pd.DataFrame
df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])
df.select('name') # DataFrame[name: string]
df['name'] # Column<b'name'>
df.name # Column<b'name'>
除了提取单列外,select还支持类似SQL中"*"提取所有列,以及对单列进行简单的运算和变换,具体应用场景可参考pd.DataFrame中赋值新列的用法,例如下述例子中首先通过"*"关键字提取现有的所有列,而后通过df.age+1构造了名字为(age+1)的新列。
df = spark.createDataFrame([("John", 17), ("Tom", 18)], schema=["name", "age"])
df.select('*', df.age+1).show()
"""
+----+---+---------+
|name|age|(age + 1)|
+----+---+---------+
|John| 17| 18|
| Tom| 18| 19|
+----+---+---------+
"""
df.select('*', (df.age+1).alias('age1')).show()
"""
+----+---+----+
|name|age|age1|
+----+---+----+
|John| 17| 18|
| Tom| 18| 19|
+----+---+----+
"""
df.where(df.age==18).show()
df.filter(df.age==18).show()
df.where('age=18').show()
df.filter('age=18').show()
"""
+----+---+
|name|age|
+----+---+
| Tom| 18|
+----+---+
"""
值得指出的是在pandas.DataFrame中类似的用法是query函数,不同的是query()中表达相等的条件符号是"==",而这里filter或where的相等条件判断则是更符合SQL语法中的单等号"="。
# 原始DataFrame
df.show()
"""
+----+---+-------------------+
|name|age| time|
+----+---+-------------------+
|John| 17|2020-09-06 15:11:00|
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+-------------------+
"""
# gorupby+pivot实现数据透视表
df.groupby(fn.substring('name', 1, 1).alias('firstName')).pivot('age').count().show()
"""
+---------+---+----+
|firstName| 17| 18|
+---------+---+----+
| T| 1| 1|
| J| 2|null|
+---------+---+----+
"""
# window函数实现时间重采样
df.groupby(fn.window('time', '5 minutes')).count().show()
"""
+--------------------+-----+
| window|count|
+--------------------+-----+
|[2020-09-06 15:10...| 3|
|[2020-09-06 15:15...| 1|
+--------------------+-----+
"""
# 多列排序,默认升序
df.sort('name', 'age').show()
"""
+----+---+-------------------+
|name|age| time|
+----+---+-------------------+
|John| 17|2020-09-06 15:11:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
| Tom| 17|2020-09-06 15:12:00|
+----+---+-------------------+
"""
# 多列排序,并制定不同排序规则
df.sort(['age', 'name'], ascending=[True, False]).show()
"""
+----+---+-------------------+
|name|age| time|
+----+---+-------------------+
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
|John| 17|2020-09-06 15:11:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+-------------------+
"""
另外,类似于SQL中count和distinct关键字,DataFrame中也有相同的用法。
以上主要是类比SQL中的关键字用法介绍了DataFrame部分主要操作,而学习DataFrame的另一个主要参照物就是pandas.DataFrame,例如以下操作:
最后,再介绍DataFrame的几个通用的常规方法:
# 根据age列创建一个名为ageNew的新列
df.withColumn('ageNew', df.age+100).show()
"""
+----+---+-------------------+------+
|name|age| time|ageNew|
+----+---+-------------------+------+
|John| 17|2020-09-06 15:11:00| 117|
| Tom| 17|2020-09-06 15:12:00| 117|
| Joy| 17|2020-09-06 15:13:00| 117|
| Tim| 18|2020-09-06 15:16:00| 118|
+----+---+-------------------+------+
"""
注意到,withColumn实现的功能完全可以由select等价实现,二者的区别和联系是:withColumn是在现有DataFrame基础上增加或修改一列,并返回新的DataFrame(包括原有其他列),适用于仅创建或修改单列;而select准确的讲是筛选新列,仅仅是在筛选过程中可以通过添加运算或表达式实现创建多个新列,返回一个筛选新列的DataFrame,而且是筛选多少列就返回多少列,适用于同时创建多列的情况(官方文档建议出于性能考虑和防止内存溢出,在创建多列时首选select)
实际上show是spark中的action算子,即会真正执行计算并返回结果;而前面的很多操作则属于transform,仅加入到DAG中完成逻辑添加,并不实际执行计算
另外,DataFrame还有一个重要操作:在session中注册为虚拟表,而后即可真正像执行SQL查询一样完成相应SQL操作。
df.createOrReplaceTempView('person') # 将df注册为表名叫person的临时表
spark.sql('select * from person').show() # 通过sql接口在person临时表中执行SQL操作
"""
+----+---+-------------------+
|name|age| time|
+----+---+-------------------+
|John| 17|2020-09-06 15:11:00|
| Tom| 17|2020-09-06 15:12:00|
| Joy| 17|2020-09-06 15:13:00|
| Tim| 18|2020-09-06 15:16:00|
+----+---+-------------------+
"""
04 sql.functions核心API
基于DataFrame可以实现SQL中大部分功能,同时为了进一步实现SQL中的运算操作,spark.sql还提供了几乎所有的SQL中的函数,确实可以实现SQL中的全部功能。按照功能,functions子模块中的功能可以主要分为以下几类:
这些函数数量较多,且与SQL中相应函数用法和语法几乎一致,无需全部记忆,仅在需要时查找使用即可。
05 总结
本文较为系统全面的介绍了PySpark中的SQL组件以及其核心数据抽象DataFrame,总体而言:该组件是PySpark中的一个重要且常用的子模块,功能丰富,既继承了Spark core中RDD的基本特点(算子和延迟执行特性),也是Spark.ml机器学习子模块的基础数据结构,其作用自然不言而喻。
与此同时,DataFrame学习成本并不高,大致相当于关系型数据库SQL+pandas.DataFrame的结合体,很多接口和功能都可以触类旁通。