前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark重点难点】SparkSQL YYDS(上)!

【Spark重点难点】SparkSQL YYDS(上)!

作者头像
王知无-import_bigdata
发布2021-12-15 12:55:06
8980
发布2021-12-15 12:55:06
举报

DataFrame来源

Spark 社区在 1.3 版本发布了 DataFrame。那么,相比 RDD,DataFrame 到底有何不同呢?

DataFrame被称为SchemaRDD。DataFrame使Spark具备了处理大规模结构化数据的能力。在Spark中,DataFrame是一种以RDD为基础的分布式数据集,因此DataFrame可以完成RDD的绝大多数功能,在开发使用时,也可以调用方法将RDD和DataFrame进行相互转换。

在开发API方面,RDD算子多采用高阶函数,高阶函数的优势在于表达能力强,它允许开发者灵活地设计并实现业务逻辑。而 DataFrame的表达能力却很弱,它定义了一套DSL算子(Domain Specific Language)。

注意:所谓的高阶函数指的是,指的是形参为函数的函数,或是返回类型为函数的函数。

代码语言:javascript
复制
> 例如:我们在WordCount程序中调用flatMap算子:
lineRDD.flatMap(line => line.split(" "))
flatMap的入参其实是一个函数。

这里需要大家注意:是不是DataFrame表达能力弱就意味着DataFrame比RDD弱呢?

恰恰相反,因为DataFrame的算子大多数都是计算逻辑确定的,Spark就可以根据基于启发式的规则或策略甚至动态运行时的信息优化DataFrame的计算过程。

那么负责DataFrame的算子优化是谁来做的呢?正是SparkSQL

Spark Core和Spark SQL的关系

我们可以用一句话描述这个关系: Spark SQL正是在Spark Core的执行引擎基础上针对结构化数据处理进行优化和改进。

上图揭示了Spark Core体系和Spark SQL体系的关系。在上图中,Spark Core作为整个Spark系统的底层执行引擎。负责了所有的任务调度、数据存储、Shuffle等核心能力。

而Spark SQL正是基于如此强大的Spark Core底层能力,形成一套独立的优化引擎系统。

简单的说,Spark SQL的计算任务通过SQL的形式最终转换成了RDD的计算。Spark SQL会对代码事先进行优化。

DataFrame的创建方式

Spark 本身支持种类丰富的数据源与数据格式,DataFrame的创建方式更是多种多样。

这里我们列举三类最常用的Spark DataFrame的创建方式。

createDataFrame & toDF

createDataFrame方法

在SqlContext中使用createDataFrame也可以创建DataFrame。数据可以来源于RDD或者自己创建的数组。

代码语言:javascript
复制
import org.apache.spark.sql.types._
val schema = StructType(List(
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false),
      StructField("birthday", DateType, nullable = false)
    ))

val rdd = spark.sparkContext.parallelize(Seq(
      Row("小明", 18, java.sql.Date.valueOf("1990-01-01")),
      Row("小芳", 20, java.sql.Date.valueOf("1999-02-01"))
    ))
val df = spark.createDataFrame(rdd, schema)
df.show()

createDataFrame 方法有两个参数,第一个参数是RDD,第二个参数就是Schema信息。createDataFrame需要的RDD的类型必须是 RDD[Row],其中的 Row 是 org.apache.spark.sql.Row,因此,对于类型为 RDD[(String, Int)]的 rdd,我们需要把它转换为RDD[Row]

df.show()函数可以将数据进行输出:

代码语言:javascript
复制
+--------------+-------------+-----------+
|name          |age          |birthday   |
+--------------+-------------+-----------+
|小明           |           18| 1990-01-01|
|小芳           |           20| 1999-02-01|
+--------------+-------------+-----------+
toDF方法

我们可以通过导入spark.implicits, 然后通过在 RDD 之上调用 toDF 就能轻松创建 DataFrame。只要这些数据的内容能指定数据类型即可。

代码语言:javascript
复制
import spark.implicits._
val df = Seq(
    ("小明", 18, java.sql.Date.valueOf("1990-01-01")),
    ("小芳", 20, java.sql.Date.valueOf("1999-02-01"))
  ).toDF("name", "age", "birthday")
  
    df.show()

打印出来的结果为:

代码语言:javascript
复制
+--------------+-------------+-----------+
|name          |age          |birthday   |
+--------------+-------------+-----------+
|小明           |           18| 1990-01-01|
|小芳           |           20| 1999-02-01|
+--------------+-------------+-----------+

同样,我们可以将一个RDD转化为df:

代码语言:javascript
复制
val rdd = spark.sparkContext.parallelize(List(1,2,3,4,5))
val df = rdd.map(x=>(x,x^2)).toDF("a","b")
df.show()

通过文件系统创建DataFrame

Spark支持非常多的文件格式,例如CSV、JSON、ORC、Parquet等。支持的文件列表你可以参考这里:

https://docs.databricks.com/data/data-sources/index.html

我们以CSV文件举例,假设我们的文件数据为:

代码语言:javascript
复制
小明,18
小芳,20
代码语言:javascript
复制
val spark = SparkSession.builder()
      .appName("csv reader")
      .master("local")
      .getOrCreate()
 
    val result = spark.read.format("csv")
      .option("delimiter", ",")
      .option("header", "true")
      .option("nullValue", "\\N")
      .option("inferSchema", "true")
      .load("path/demo.csv")
 
    result.show()
    result.printSchema()

当然,不同的文件格式有非常多的可选项,你可以参考上面给出的官网连接。

通过其他数据源创建DataFrame

我们可以通过指定连接参数例如数据库地址、用户名、密码等连接其他数据源。我们以MySQL为例:

代码语言:javascript
复制
val url = "jdbc:mysql://localhost:3306/test"
val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", "test")
          .option("user", "admin")
          .option("password", "admin")
          .load()
df.show()
代码语言:javascript
复制
+---+----+----+
| id|user|age|
+---+----+----+
|  1| 小明| 18|
|  2| 小芳| 20|
+---+----+----+

常用语法和算子

Spark SQL支持的算子列表非常非常多。

你可以在这里看到所有的算子列表:

https://spark.apache.org/docs/3.2.0/api/sql/index.html

我们举几个最常用的语法演示给大家看。

  • 单行查询
代码语言:javascript
复制
var userDF = List((1, "张三", true, 18, 15000, 1))
 .toDF("id", "name", "sex", "age", "salary", "dept")
userDF.createTempView("t_employee")
val sql = "select * from t_employee where name = '张三'"
spark.sql(sql).show()
  • 分组查询
代码语言:javascript
复制
var userDF= List( (1,"张三",true,18,15000,1),
 (2,"李四",false,18,12000,1),
 (3,"王五",false,18,16000,2)
) .toDF("id","name","sex","age","salary","dept")

//构建视图
userDF.createTempView("t_employee")
val sql=
 """
   |select dept ,avg(salary) as avg_slalary from t_employee
   |group by dept order by avg_slalary desc
 """.stripMargin
spark.sql(sql).show()
代码语言:javascript
复制
+----+-----------+
|dept|avg_slalary|
+----+-----------+
|   2|    16000.0|
|   1|    13500.0|
+----+-----------+
  • 开窗函数
代码语言:javascript
复制
// 开窗函数
var df=List(
 (1,"zs",true,1,15000),
 (2,"ls",false,2,18000),
 (3,"ww",false,2,14000),
 (4,"zl",false,1,18000),
 (5,"win7",false,1,16000)
).toDF("id","name","sex","dept","salary")
df.createTempView("t_employee")

val sql=
 """
   |select id,name,salary,dept,
   |count(id) over(partition by dept order by salary desc) as rank,
   |(count(id) over(partition by dept order by salary desc rows between current row and unbounded following) - 1) as low_than_me,
   |avg(salary) over(partition by dept rows between unbounded preceding and unbounded following) as avg_salary,
   |avg(salary) over() as all_avg_salary 
   |from t_employee t1 order by dept desc
 """.stripMargin
 
spark.sql(sql).show()

你可以参考这篇博客,这个博客中的例子非常多:

https://mask0407.blog.csdn.net/article/details/106716575

总结

本章我们讲解了Spark SQL的来源,Spark DataFrame创建的方式以及常用的算子。下篇我们将讲解Spark SQL中的Catalyst优化器和Tungsten,以及Spark SQL的Join策略选择。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark Core和Spark SQL的关系
  • DataFrame的创建方式
    • createDataFrame & toDF
      • createDataFrame方法
      • toDF方法
    • 通过文件系统创建DataFrame
      • 通过其他数据源创建DataFrame
      • 常用语法和算子
      • 总结
      相关产品与服务
      云数据库 MySQL
      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档