Spark SQL 是 Spark 的结构化数据处理模块,特点如下:
数据兼容:可从Hive表、外部数据库(JDBC)、RDD、Parquet 文件、JSON 文件获取数据,可通过 Scala 方法或 SQL 方式操作这些数据,并把结果转回 RDD。
组件扩展:SQL 语法解析器、分析器、优化器均可重新定义。
性能优化:内存列存储、动态字节码生成等优化技术,内存缓存数据。
多语言支持:Scala、Java、Python、R。
001
原理
组成
Catalyst 优化:优化处理查询语句的整个过程,包括解析、绑定、优化、物理计划等,主要由关系代数(relation algebra)、表达式(expression)以及查询优化(query optimization)组成。
Spark SQL 内核:处理数据的输入输出,从不同数据源(结构化数据 Parquet 文件 JSON 文件、Hive 表、外部数据库、已有 RDD)获取数据,执行查询(expression of queries),并将查询结果输出成 DataFrame。
Hive 支持:对 Hive 数据的处理,主要包括 HiveQL、MetaStore、SerDes、UDFs 等。
执行流程
SqlParser 对 SQL 语句解析,生成 Unresolved 逻辑计划(未提取 Schema 信息);
Catalyst 分析器结合数据字典(catalog)进行绑定,生成 Analyzed 逻辑计划,过程中 Schema Catalog 要提取 Schema 信息;
Catalyst 优化器对 Analyzed 逻辑计划优化,按照优化规则得到 Optimized 逻辑计划;
与 Spark Planner 交互,应用策略(strategy)到 plan,使用 Spark Planner 将逻辑计划转换成物理计划,然后调用 next 函数,生成可执行物理计划。
性能
内存列式缓存:内存列式(in-memory columnar format)缓存(再次执行时无需重复读取),仅扫描需要的列,并自动调整压缩比使内存使用率和 GC 压力最小化。
动态代码和字节码生成技术:提升重复表达式求值查询的速率。
Tungsten 优化:
由 Spark 自己管理内存而不是 JVM,避免了 JVM GC 带来的性能损失。
内存中 Java 对象被存储成 Spark 自己的二进制格式,直接在二进制格式上计算,省去序列化和反序列化时间;此格式更紧凑,节省内存空间。
002
API
应用程序模板
通用读写方法
读写文件代码(统一使用 sqlContext.read 和 dataFrame.write)模板:
003
RDD 转为 DataFrame
方法1
方法:使用反射机制推断 RDD Schema。
场景:运行前知道 Schema。
特点:代码简洁。
示例:
方法2
方法:以编程方式定义 RDD Schema。
场景:运行前不知道 Schema。
示例:
003
Parquet 文件数据源
Parquet 优点:
高效、Parquet 采用列式存储避免读入不需要的数据,具有极好的性能和 GC;
方便的压缩和解压缩,并具有极好的压缩比例;
可直接读写 Parquet 文件,比磁盘更好的缓存效果。
Spark SQL 支持根据 Parquet 文件自描述自动推断 Schema,生成 DataFrame。
编程示例:
分区发现(partition discovery)
与 Hive 分区表类似,通过分区列的值对表设置分区目录,加载 Parquet 数据源可自动发现和推断分区信息。
示例:有一个分区列为 gender 和 country 的分区表,加载路径“/path/to/table”可自动提取分区信息
创建的 DataFrame 的 Schema:
004
JSON 文件数据源
Spark SQL 支持根据 JSON 文件自描述自动推断 Schema,生成 DataFrame。
示例:
005
Hive数据源
HiveContext
操作 Hive 数据源须创建 SQLContext 的子类 HiveContext 对象。
Standalone 集群:添加 hive-site.xml 到 $SPARK_HOME/conf 目录。
YARN 集群:添加 hive-site.xml 到 $YARN_CONF_DIR 目录;添加 Hive 元数据库 JDBC 驱动 jar 文件到 $HADOOP_HOME/lib 目录。
最简单方法:通过 spark-submit 命令参数--file 和--jar 参数分别指定 hive-site.xml 和 Hive 元数据库 JDBC 驱动 jar 文件。
未找到 hive-site.xml:当前目录下自动创建 metastore_db 和 warehouse 目录。
模板:
使用 HiveQL
支持 Hive 特性
Hive 查询语句,包括 select、group by、order by、cluster by、sort by;
Hive 运算符,包括:关系运算符(=、⇔、==、、、>=、
用户自定义函数(UDF);
用户自定义聚合函数(UDAF);
用户自定义序列化格式(SerDes);
连接操作,包括 join、 outer join、left semi join、cross join;
联合操作(union);
子查询:select col from (select a + b as col from t1) t2;
抽样(Sampling);
解释(Explain);
分区表(Partitioned table);
所有 Hive DDL操作函数,包括 create table、create table as select、alter table;
大多数 Hive 数据类型 tinyint、smallint、int、bigint、boolean、float、double、string、binary、timestamp、date、array、map、struct。
006
数据库 JDBC 数据源
Spark SQL 支持加载数据库表生成 DataFrame。
模板(注意:需要相关 JDBC 驱动 jar 文件)
JDBC 参数
007
DataFrame Operation
分类:
DataFrameAction
基础 DataFrame 函数(basic DataFrame functions)
集成语言查询(language integrated queries)
输出操作
RDD Operation
DataFrame 本质是一个拥有多个分区的 RDD,支持 RDD Operation:coalesce、flatMap、foreach、foreachPartition、javaRDD、map、mapPartitions、repartition、toJSON、toJavaRDD 等。
008
性能调优
缓存数据
内存列式(in-memory columnar format)缓存:Spark SQL 仅扫描需要的列,并自动调整压缩比使内存使用率和 GC 压力最小化。
相关配置:
缓存/移除缓存代码模板:
参数调优
009
案例
数据准备
数据结构
职工基本信息(people)
部门基本信息(department)
职工考勤信息(attendance)
职工工资清单(salary)
建库、建表(spark-shell方式)
测试数据
职工基本信息(people.csv)
部门基本信息(department.csv)
职工考勤信息(attendance.csv)
职工工资清单(salary.csv)
上传数据文件至 HDFS
查询部门职工数
HiveQL 方式
Scala方式
结果
查询各部门职工工资总数,并排序
HiveQL 方式
Scala 方式
结果
查询各部门职工考勤信息
HiveQL 方式
Scala 方式
结果
本文转自博客园,作者ID「netoxi」
http://www.cnblogs.com/netoxi/p/7223413.html
领取专属 10元无门槛券
私享最新 技术干货