Spark SQL 技术点汇总

Spark SQL 是 Spark 的结构化数据处理模块,特点如下:

数据兼容:可从Hive表、外部数据库(JDBC)、RDD、Parquet 文件、JSON 文件获取数据,可通过 Scala 方法或 SQL 方式操作这些数据,并把结果转回 RDD。

组件扩展:SQL 语法解析器、分析器、优化器均可重新定义。

性能优化:内存列存储、动态字节码生成等优化技术,内存缓存数据。

多语言支持:Scala、Java、Python、R。

001

原理

组成

Catalyst 优化:优化处理查询语句的整个过程,包括解析、绑定、优化、物理计划等,主要由关系代数(relation algebra)、表达式(expression)以及查询优化(query optimization)组成。

Spark SQL 内核:处理数据的输入输出,从不同数据源(结构化数据 Parquet 文件 JSON 文件、Hive 表、外部数据库、已有 RDD)获取数据,执行查询(expression of queries),并将查询结果输出成 DataFrame。

Hive 支持:对 Hive 数据的处理,主要包括 HiveQL、MetaStore、SerDes、UDFs 等。

执行流程

SqlParser 对 SQL 语句解析,生成 Unresolved 逻辑计划(未提取 Schema 信息);

Catalyst 分析器结合数据字典(catalog)进行绑定,生成 Analyzed 逻辑计划,过程中 Schema Catalog 要提取 Schema 信息;

Catalyst 优化器对 Analyzed 逻辑计划优化,按照优化规则得到 Optimized 逻辑计划;

与 Spark Planner 交互,应用策略(strategy)到 plan,使用 Spark Planner 将逻辑计划转换成物理计划,然后调用 next 函数,生成可执行物理计划。

性能

内存列式缓存:内存列式(in-memory columnar format)缓存(再次执行时无需重复读取),仅扫描需要的列,并自动调整压缩比使内存使用率和 GC 压力最小化。

动态代码和字节码生成技术:提升重复表达式求值查询的速率。

Tungsten 优化:

由 Spark 自己管理内存而不是 JVM,避免了 JVM GC 带来的性能损失。

内存中 Java 对象被存储成 Spark 自己的二进制格式,直接在二进制格式上计算,省去序列化和反序列化时间;此格式更紧凑,节省内存空间。

002

API

应用程序模板

通用读写方法

读写文件代码(统一使用 sqlContext.read 和 dataFrame.write)模板:

003

RDD 转为 DataFrame

方法1

方法:使用反射机制推断 RDD Schema。

场景:运行前知道 Schema。

特点:代码简洁。

示例:

方法2

方法:以编程方式定义 RDD Schema。

场景:运行前不知道 Schema。

示例:

003

Parquet 文件数据源

Parquet 优点:

高效、Parquet 采用列式存储避免读入不需要的数据,具有极好的性能和 GC;

方便的压缩和解压缩,并具有极好的压缩比例;

可直接读写 Parquet 文件,比磁盘更好的缓存效果。

Spark SQL 支持根据 Parquet 文件自描述自动推断 Schema,生成 DataFrame。

编程示例:

分区发现(partition discovery)

与 Hive 分区表类似,通过分区列的值对表设置分区目录,加载 Parquet 数据源可自动发现和推断分区信息。

示例:有一个分区列为 gender 和 country 的分区表,加载路径“/path/to/table”可自动提取分区信息

创建的 DataFrame 的 Schema:

004

JSON 文件数据源

Spark SQL 支持根据 JSON 文件自描述自动推断 Schema,生成 DataFrame。

示例:

005

Hive数据源

HiveContext

操作 Hive 数据源须创建 SQLContext 的子类 HiveContext 对象。

Standalone 集群:添加 hive-site.xml 到 $SPARK_HOME/conf 目录。

YARN 集群:添加 hive-site.xml 到 $YARN_CONF_DIR 目录;添加 Hive 元数据库 JDBC 驱动 jar 文件到 $HADOOP_HOME/lib 目录。

最简单方法:通过 spark-submit 命令参数--file 和--jar 参数分别指定 hive-site.xml 和 Hive 元数据库 JDBC 驱动 jar 文件。

未找到 hive-site.xml:当前目录下自动创建 metastore_db 和 warehouse 目录。

模板:

使用 HiveQL

支持 Hive 特性

Hive 查询语句,包括 select、group by、order by、cluster by、sort by;

Hive 运算符,包括:关系运算符(=、⇔、==、、、>=、

用户自定义函数(UDF);

用户自定义聚合函数(UDAF);

用户自定义序列化格式(SerDes);

连接操作,包括 join、 outer join、left semi join、cross join;

联合操作(union);

子查询:select col from (select a + b as col from t1) t2;

抽样(Sampling);

解释(Explain);

分区表(Partitioned table);

所有 Hive DDL操作函数,包括 create table、create table as select、alter table;

大多数 Hive 数据类型 tinyint、smallint、int、bigint、boolean、float、double、string、binary、timestamp、date、array、map、struct。

006

数据库 JDBC 数据源

Spark SQL 支持加载数据库表生成 DataFrame。

模板(注意:需要相关 JDBC 驱动 jar 文件)

JDBC 参数

007

DataFrame Operation

分类:

DataFrameAction

基础 DataFrame 函数(basic DataFrame functions)

集成语言查询(language integrated queries)

输出操作

RDD Operation

DataFrame 本质是一个拥有多个分区的 RDD,支持 RDD Operation:coalesce、flatMap、foreach、foreachPartition、javaRDD、map、mapPartitions、repartition、toJSON、toJavaRDD 等。

008

性能调优

缓存数据

内存列式(in-memory columnar format)缓存:Spark SQL 仅扫描需要的列,并自动调整压缩比使内存使用率和 GC 压力最小化。

相关配置:

缓存/移除缓存代码模板:

参数调优

009

案例

数据准备

数据结构

职工基本信息(people)

部门基本信息(department)

职工考勤信息(attendance)

职工工资清单(salary)

建库、建表(spark-shell方式)

测试数据

职工基本信息(people.csv)

部门基本信息(department.csv)

职工考勤信息(attendance.csv)

职工工资清单(salary.csv)

上传数据文件至 HDFS

查询部门职工数

HiveQL 方式

Scala方式

结果

查询各部门职工工资总数,并排序

HiveQL 方式

Scala 方式

结果

查询各部门职工考勤信息

HiveQL 方式

Scala 方式

结果

本文转自博客园,作者ID「netoxi」

http://www.cnblogs.com/netoxi/p/7223413.html

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180925B1CO9B00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券