Spark之SparkSQL

1、SparkSQL概述

SparkSQL是Spark用来处理结构化数据的一个模块,它提供了非常强大的API,让分析人员使用一次就为之倾倒,为之着迷。在内部,SparkSQL使用额外的结构信息来执行额外的优化,在外部,使用SQL和DataSet的API与之交互。官网给SparkSQL的定义,如下图所示。

在Hadoop生态中已经有了Hive、Pig等分析工具,为什么还会出现SparkSQL呢?在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应用而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。但是,MapReduce在计算过程中大量的中间磁盘落地过程消耗了大量的磁盘I/O,严重地降低了运行效率。为了提高SQL-on-Hadoop的运行效率,大量的SQL-on-Hadoop工具开始产生,其中表现突出的有一个叫做Shark的工具,Shark运行在Spark引擎上,从而使得SQL的查询速度得到了10-100倍的提升。

但是,随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等),与Spark的“One Stack to Rule Them All”的既定方针不相匹配,制约了Spark各个组件的相互集成,于是就产生了SparkSQL。简单总结就是,SparkSQL的开发目的是为用户提供关系查询和复杂过程算法(如机器学习算法)混合应用的灵活性,让此类应用在内存中执行分析,在几秒或几分钟内生成结果。

DataFrame

DataFrame是由列组成的数据集合,它在概念上等同于关系型数据库中的表或R/Python中的data frame,但在查询引擎上进行了丰富的优化。DataFrame可以由各种各样的数据源构建,如结构化数据文件、hive中的表、外部数据库或现有的RDD等。

从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合,而DataFrame是分布式的Row对象的集合。DataFrame除了提供比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

DataSet

DataSet是分布式的数据集合,是Spark 1.6中添加的一个新接口,是特定域对象中的强类型集合,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及SparkSQL优化后执行的优点,它可以使用函数或者相关操作并行地进行转换操作,数据集可以由JVM对象构造,然后使用函数转换(如map、flatmap、filter等)进行操作。DataSet API支持Scala和Java,但不支持Python。

2、SparkSQL的特点

从上文了解了SparkSQL的产生及基本概念,那么,SparkSQL又具有哪些特点呢?下面将一一列举。

A、容易集成

SparkSQL将SQL查询与Spark程序无缝对接,它允许用户使用SQL或熟悉的DataFrame API查询Spark程序内的结构化数据,可应用于Java、Scala、Python和R。

B、统一的数据访问方式

可使用同样的方式连接任何数据源,DataFrame和SQL提供了访问各种数据源的常用方式,包括Hive、Avro、Parquet、ORC、JSON和JDBC,甚至可以通过这些数据源直接加载数据。

C、Hive集成

能够在现有数据仓库上运行SQL或HiveSQL查询,SparkSQL支持HiveQL语法以及HiveSerDes(序列化和反序列化工具)和UDF(用户自定义函数),允许用户访问现有的Hive仓库。

D、标准的数据连接

通过JDBC或ODBC进行数据库连接,服务器模式为商业智能工具提供行业标准的JDBC和ODBC数据连接。

3、SparkSQL的组件

A、DataFrame API

DataFrame或SchemaRDD不是一个新概念,它早在R语言中或Python的Pandas中就已经实现。Spark扩展并利用了相同的概念,其所实现的DataFrame用于存储具有相同模式的分布式行集合。它是SparkSQL API中的主要抽象,相当于关系型数据库中的表,可以像对Spark Core中分布式数据集合(RDD)一样,以类似的方式进行操作,DataFrame跟踪其模式并支持产生更优化执行的各种关系操作。

B、DataFrame和RDD

DataFrame像是行对象的RDD一般,其允许用户调用如map、reduce等过程性的Spark API函数,它们可以通过各种各样的方式来构建,既能直接从关系型数据存储中的表或JSON或Cassandra来构建,或者通过Scala/Java/Python代码创建的现有RDD中来构建。一旦构造好DataFrame,就可以使用诸如GroupBy、OrderBy等各种关系运算符来查询或操作它们,这些运算符接受领域特定语言中的表达式。

需要注意的是,像RDD一样,DataFrame也是延迟执行的,除非用户调用诸如DataFrame上的print()或count()之类的输出操作,否则不会有物理执行发生。如RDD一样的DataFrame还将数据缓存在内存中,但它们以列的形式存储,这样有助于减少内存占用。它们还应用诸如字典编码和运行长度编码之类的列式压缩方案,这使得它们更优于RDD中原生Java/Scala对象的缓存。

C、用户自定义函数

DataFrame还支持用户自定义函数(UDF),它们可以应用为内联函数从而无须复杂的打包和注册过程。一旦完成注册,它们也可以通过JDBC和ODBC接口由商业智能工具使用。DataFrame还提供了在整个表上定义UDF的灵活性,然后将它们作为高级分析函数公开给SQL用户使用。

D、DataFrame和SQL

在高级别上,DataFrame提供了与SQL相类似的功能。使用SparkSQL和DataFrame,同关系查询(SQL)相比,执行分析要容易得多。DataFrame为用户提供的一站式解决方案中,不仅可以编写SQL查询,还可以开发和利用Scala、Java或Python函数,并在它们之间传递DataFrame来构建一个逻辑计划,并且到最终执行时能从整个计划的优化中受益,而且开发人员还可以使用如if语句和循环的控制结构来开展其工作。DataFrame API在相当早时就分析了逻辑计划,以便在开发人员编码时识别诸如缺少列名之类的错误,但是实际查询中仍然仅在调用输出操作时执行。

4、创建DataFrame

在正式介绍创建DataFrame的几种方式之前,简单介绍下这次使用的测试数据集,其实在之前已经使用过这两个数据集,分别是员工表(employee)以及部门表(department),其内容如下图所示(注意,需要将这两个数据表上传到HDFS对应的目录下,这里是/data目录)。

A、通过case class创建DataFrame

(1)定义自定义case class类,代表表的结构schema

case class employee(empno:Int,ename:String, job:String, manager:String, hiredate:String, salary:Int,bonus:String, deptno:Int)

(2)读取employee文件中的数据,并进行分词操作

val empdata= sc.textFile("hdfs://192.168.12.221:9000/data/employee").map(_.split(","))

(3)生成表DataFrame

val allemps = empdata.map(x =>employee(x(0).toInt, x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt)),然后,直接由allemps生成DataFrame表:valempdf = allemps.toDF

(4)使用DSL语句,操作empdf表,查看表数据及结构信息

empdf.show ——>对应SQL语句select * from employee

empdf.printSchema ——>对应SQL语句desc employee

运行结果如下图所示:

B、使用SparkSession对象创建DataFrame

Apache Spark 2.0引入了SparkSession对象,它为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和DataSet相关API来编写Spark程序。最重要的是,它减少了用户需要了解的概念,使得很容易就能与Spark交互。

在Apache Spark 2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象都已经封装在SparkSession中。从启动Spark客户端(即spark-shell)的log中可以看到相关的信息,如下图所示:

(1)创建StructType,用于定义表的结构schema信息

注意,在创建StructType前,需要导入相关的包,如:

val employeeschema =StructType(List(StructField("empno", DataTypes.IntegerType),StructField("ename",DataTypes.StringType),StructField("job",DataTypes.StringType),StructField("manager",DataTypes.StringType),StructField("hiredate",DataTypes.StringType),StructField("salary",DataTypes.IntegerType),StructField("bonus",DataTypes.StringType),StructField("deptno", DataTypes.IntegerType)))

(2)将读入的每一行数据映射成一个Row

val rowRDD = empdata.map(x => Row(x(0).toInt,x(1), x(2), x(3), x(4), x(5).toInt, x(6), x(7).toInt))

(3)使用SparkSession.createDataFrame创建表

val empdf =spark.createDataFrame(rowRDD, employeeschema)

(4)使用DSL语句,操作empdf表,查看表数据及结构信息

empdf.show

empdf.printSchema

运行结果如下图所示:

C、通过读取一个带格式的文件(如json文件),直接创建DataFrame

这里没法使用上述的测试数据文件来创建DataFrame,因为它不具有相应的格式。在Spark的安装目录下,提供了一些带格式的测试数据文件,如下图所示(这里使用people.json文件进行测试)。

(1)使用SparkSession对象直接读取Json文件

val peopledf = spark.read.json("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")

(2)使用DSL语句,操作peopledf表,查看表数据及结构信息

peopledf.show

peopledf.printSchema

运行结果如下图所示:

5、操作DataFrame的两种方式

通常来说,对SparkSQL中的DataFrame进行查询等操作有两种方式,其一是使用DSL语句,简单解释就是SparkSQL针对DataFrame相关操作进行了封装,提供一套函数调用接口;其二是直接使用SQL语句,这跟在关系型数据库中进行的操作基本一致。下面分别针对这两种方式进行举例介绍。

A、使用DSL语句对DataFrame进行操作

这种方式使用得并不是特别多,毕竟多数人更习惯使用SQL语句,下面简单举几个使用DSL语句操作DataFrame的实例。从下图可以看到,SparkSQL针对DataFrame表提供了非常多操作函数。

(1)查询所有员工的信息:empdf.show

(2)查询所有员工的姓名并进行显示:empdf.select(“ename”).show或empdf.select($“ename”).show

(3)查询所有员工的姓名和薪水,并给薪水增加100

(4)查询薪水大于等于2400的员工:empdf.filter($”salary” >=2400).show

(5)统计各个部门的员工人数:empdf.groupBy($"deptno").count.show

B、使用SQL语句对DataFrame进行操作

SQL语句相信大家再熟悉不过了,SparkSQL的设计初衷也是为了提供对SQL语句的支持,不过需要注意的是,虽然可以使用SparkSQL提供的sql函数直接执行SQL语句,但在执行前需要将对应的DataFrame注册成一个临时Table或者View。

(1)将empdf注册成一个临时表Table,然后查询所有员工信息

(2)将empdf注册成一个临时View,然后查询所有员工信息

(3)查询10号部门所有的员工

(4)求每个部门的薪水总和

6、DataSet详解

DataFrame的引入,可以让Spark更好的操作结构化数据,但存在一个很关键的问题:缺乏编译时类型安全。为了解决这个问题,Spark采用新的DataSet API (DataFrame API的类型扩展)。在Spark DataSet源码中有一段解释,如下图所示:

简单解释就是:DataSet是一个强类型的特定域对象的集合,可以使用函数或关系操作并行转换,每个数据集还对应有一个无类型的视图,称为DataFrame,这是一个行(Row)的数据集合。

在DataSet上的操作,分为transformations和actions, transformations会产生新的数据集(仍旧是DataSet),而actions则会触发计算产生结果。transformations包括map、filter、select和aggregate(同groupBy)等操作,而actions包括count、show或把数据写入文件系统中等操作。

DataSet是懒惰(lazy)的,也就是说当一个action被调用时才会触发一个计算,在内部实现中,数据集表示的是一个逻辑计划,它描述了生成数据所需的计算。当action被调用时,Spark的查询优化器会优化这个逻辑计划,并生成一个物理计划,该物理计划可以通过并行和分布式的方式来执行。可以使用explain解释函数,来进行逻辑计划的探索和物理计划的优化。

为了有效地支持特定域对象,Encoder(编码器)是必须的,如给出一个Person的类,有两个字段:name(string)和age(int),通过一个encoder来告诉Spark在运行的时候产生代码把Person对象转换成一个二进制结构,这种二进制结构通常有更低的内存占用,以及优化的数据处理效率。若想要了解数据的内部二进制表示,可以使用schema(表结构)函数。

简单总结,DataSet是一个分布式的数据收集器,这是在Spark1.6之后新增加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda函数)以及SparkSQL执行器高效的优点,所以可以把DataFrame看成是一种特殊的DataSet,即DataSet(Row)。那么,如何创建DataSet呢?下面介绍几种创建DataSet的方式。

A、使用序列创建DataSet

(1)定义自定义case class:case class MyCaseClass(x:Int,y:String)

(2)生成序列,并创建DataSet:val myDataSet = Seq(MyCaseClass(1,"Special"), MyCaseClass(1, "Iris")).toDS

(3)查看结果:myDataSet.show

运行结果如下图所示:

B、使用JSON数据创建DataSet

(1)定义自定义case class:case class Person(name:String, gender:String)

(3)将DataFrame转换成DataSet:myDF.as[Person].show或myDF.as[Person].collect

运行结果如下图所示:

C、使用HDFS上的数据文件创建DataSet

(1)读取HDFS文件系统上的数据文件,并创建生成DataSet:val myData = spark.read.text("hdfs://192.168.12.221:9000/input/data.txt").as[String]

(2)对DataSet进行分词操作,然后查询长度大于2的单词:val words =myData.flatMap(_.split(" ")).filter(_.length > 2)

(3)执行单词计数操作(即钱满介绍的WordCount程序):val result =myData.flatMap(_.split(" ")).map((_, 1)).groupByKey(x =>x._1).count

(4)对单词计数结果进行排序并显示:result.orderBy($"value").show

运行结果如下图所示:

7、使用SparkSQL数据源

在SparkSQL中,Parquet文件是其默认的数据源,在正式介绍使用SparkSQL中的各种数据源之前,先简单介绍下Parquet文件。

A、Parquet文件

Parquet文件是列式存储格式且能够应用于多种数据处理系统的一种文件类型,SparkSQL提供对于Parquet文件读写的支持,也就是自动保存原始数据的结构schema。当写Parquet文件时,所有的列被自动转化为nullable,这是由于兼容性的缘故。Parquet文件具有以下几个主要特点:

B、通用的Load/Save函数

在SparkSQL中,主要是操作表DataFrame,而DataFrame本身提供了save和load操作:

Load:用于创建DataFrame,可以使用具体的格式来指明要读取的文件类型;

Save:用于保存DataFrame中的数据,可以使用具体的格式来指明输出的文件类型。

举例如下:

(1)读取Parquet文件,read函数返回DataFrameReader,用于读取数据:

val userDF =spark.read.load("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/users.parquet")

(2)查询userDF的结构schema及数据信息:userDF.printSchema;userDF.show

(3)查询用户的name以及喜欢的颜色,并将查询结果保存为Parquet文件:userDF.select($"name",$"favorite_color").write.save("/root/data/result.parquet")

运行结果如下图所示:

当加载json格式的文件时,需要显示指定文件格式为json,否则加载会失败,如直接加载json格式的文件people.json:val peopleDF =spark.read.load("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json"),会报错,如下图所示:

正确的写法应该为:val peopleDF =spark.read.format("json").load("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")

C、存储模式(Save Mode)

可以采用Save Mode执行存储操作,Save Mode定义了对数据的处理模式。需要注意的是,这些保存模式不使用任何锁定,不是原子操作。此外,当使用Overwrite方式执行时,在输出新数据之前原数据就已经被删除。Save Mode详细介绍如下表:

举例如下:

从userDF查询所有用户的姓名,将结果保存为username.parquet:userDF.select($"name").write.save("/root/data/username.parquet")

然后再次执行userDF.select($"name").write.save("/root/data/username.parquet"),会报错,因为/root/data/username.parquet文件已经存在,如下图所示:

解决此问题的方法是,设置存储模式SaveMode为overwrite,即userDF.select($"name").write.mode("overwrite").save("/root/data/username.parquet")

除了将结果保存为Parquet文件之外,还可以保存为表(该表并不会保存到文件系统中,而是保存在内存中),如:userDF.select($"name").write.saveAsTable("userTable"),然后查询表userTable所有数据并显示:spark.sql("select *from userTable").show。

运行结果如下图所示:

D、多种多样的数据源

数据源之Parquet

在SparkSQL中可以直接读取Parquet格式的文件,可以将结果保存为Parquet格式的文件,甚至还可以读入json格式的数据,然后转换为Parquet格式,并创建相应的表来使用SQL语句进行查询。比如,现有employee.json文件,其内容如下图所示:

(1)读取employee.json文件中的数据:val employeeJsonDF =spark.read.json("/root/data/employee.json")

(2)将数据保存为Parquet格式:employeeJsonDF.write.mode("overwrite").parquet("/root/data/employee.parquet")

(3)重新从Parquet文件中读入数据:val employeeParquet =spark.read.parquet("/root/data/employee.parquet")

(4)创建临时视图:employeeParquet.createOrReplaceTempView("empTable")

(5)查询empTable表中部门号为20且薪水大于等于1600的员工,并将结果进行显示:spark.sql("select * from empTable where deptno=10 andsal>=1600").show

运行结果如下图所示:

有意思的是,Parquet文件还支持Schema Evolution(schema演变,即:合并)。用户可以先定义一个简单的schema,然后逐渐地向该schema中增加列描述。通过这种方式,用户可以获得多个有不同schema但相互兼容的Parquet文件。举例如下:

val df1 = sc.makeRDD(1 to 5).map(i=> (i, i * 2)).toDF("single", "double")

df1.write.parquet("/root/data/myresult/test_table/key=1")

val df2 = sc.makeRDD(6 to 10).map(i=> (i, i * 3)).toDF("single", "triple")

df2.write.parquet("/root/data/myresult/test_table/key=2")

val df3 = spark.read.option("mergeSchema","true").parquet("/root/data/myresult/test_table/")

df3.printSchema()

运行结果如下图所示:

数据源之JSON

(1)使用Spark自带测试数据文件people.json,读取json文件生成DataFrame:val peopleDF =spark.read.json("/root/training/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources/people.json")

(2)打印peopleDF的schema结构信息:peopleDF.printSchema

(3)为peopleDF创建临时视图:peopleDF.createOrReplaceTempView("people")

(4)执行查询并显示结果:spark.sql("select name from people where age=19").show

运行结果如下图所示:

这里介绍了SparkSQL中的两种数据源,除了上面提到的Parquet文件和JSON文件,还有JDBC(用于读取关系型数据库表中的数据)以及Hive Table(读取Hive表中的数据)等,本文先跳过这些内容,后面专门补发一篇文章来对这些相关内容做介绍。

8、SparkSQL编程实践

这里的编程实践,其本质跟前面的程序举例是一样的,只是这里的程序更完整,更接合实际应用。这里暂举两个使用Scala语言编写的SparkSQL程序实例,第一个实例使用StructType来指定表DataFrame的结构schema,第二个实例使用自定义case class来指定表DataFrame的结构schema。

A、使用StructType指定schema

测试数据如下图所示:

程序代码如下图所示:

运行结果如下图所示:

B、使用case class指定schema

这里的测试数据,跟上一个实例使用的数据一致,程序代码如下图所示:

运行结果如下图所示:

本文到此结束,还有一些比较重要的内容未做介绍,后面会再专门发一篇文章以作补充,欢迎关注!!

参考文献:

——《CSDN博客》

——《实时大数据分析 基于Storm、Spark技术的实时应用》

——《潭州大数据课程课件》

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180908G0R6IB00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券