欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你。
/ 发家史 /
熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。
Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。
Spark SQL
Spark SQL 提供了多种接口:
当然,相应的,也会有各种客户端:
/ Dataframe/Dataset API 简介 /
Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。
可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:
Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:
type DataFrame = Dataset[Row]所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。
基本操作
val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)
df.show()
import spark.implicits._
df.printSchema()
df.select("name").show()
df.select($"name", $"age" + 1).show()
df.filter($"age" > 21).show()
df.groupBy("age").count().show()
spark.stop()分区分桶 排序
分桶排序保存hive表
df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)
分区以parquet输出到指定目录
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
分区分桶保存到hive表
df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")cube rullup pivot
cube
sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()
rull up
sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()
pivot 只能跟在groupby之后
sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()/ SQL 编程 /
Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:
1. spark 代码
2. spark-sql的shell
3. thriftserver
支持 Spark SQL 自身的语法,同时也兼容 HSQL。
1. 编码
要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。
SQLContext
new SQLContext(SparkContext)HiveContext
new HiveContext(spark.sparkContext)SparkSession
不使用 hive 元数据:
val spark = SparkSession.builder()
.config(sparkConf) .getOrCreate()使用 hive 元数据:
val spark = SparkSession.builder()
.config(sparkConf) .enableHiveSupport().getOrCreate()使用
val df =spark.read.json("examples/src/main/resources/people.json")
df.createOrReplaceTempView("people")
spark.sql("SELECT * FROM people").show()2. spark-sql 脚本
spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用
bin/spark-sql –help 查看配置参数。
需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试
show tables;
select count(*) from student;3. thriftserver
thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。
安装部署
/1 开启 hive 的 metastore
bin/hive --service metastore/2 将配置文件复制到spark/conf/目录下
/3 thriftserver
sbin/start-thriftserver.sh --masteryarn --deploy-mode client对于 yarn 只支持 client 模式。
/4 启动 bin/beeline
/5 连接到 thriftserver
!connect jdbc:hive2://localhost:10001/ 用户自定义函数 /
1. UDF
定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:
val len = udf{(str:String) => str.length}
spark.udf.register("len",len)
val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.sql("select len(name) from employees").show()2. UserDefinedAggregateFunction
定义一个 UDAF
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverageUDAF extends UserDefinedAggregateFunction {
//Data types of input arguments of this aggregate function
definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)
//Data types of values in the aggregation buffer
defbufferSchema:StructType = {
StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)
}
//The data type of the returned value
defdataType:DataType = DoubleType
//Whether this function always returns the same output on the identical input
defdeterministic: Boolean = true
//Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to
// standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides
// the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still
// immutable.
definitialize(buffer:MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
//Updates the given aggregation buffer `buffer` with new input data from `input`
defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={
if(!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0)+ input.getLong(0)
buffer(1) = buffer.getLong(1)+ 1
}
}
// Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`
defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={
buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)
}
//Calculates the final result
defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)
}使用 UDAF
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.createOrReplaceTempView("employees")
ds.show()
spark.udf.register("myAverage", MyAverageUDAF)
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()3. Aggregator
定义一个 Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverageAggregator extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}使用
spark.udf.register("myAverage2", MyAverageAggregator)
import spark.implicits._
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]
ds.show()
val averageSalary = MyAverageAggregator.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()/ 数据源 /
1. 通用的 laod/save 函数 可支持多种数据格式:json, parquet, jdbc, orc, libsvm, csv, text
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")默认的是 parquet,可以通过 spark.sql.sources.default,修改默认配置。
2. Parquet 文件
val parquetFileDF =spark.read.parquet("people.parquet")
peopleDF.write.parquet("people.parquet")3. ORC 文件
val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")
ds.write.mode("append").orc("/opt/outputorc/")
spark.read.orc("/opt/outputorc/*").show(1)4. JSON
ds.write.mode("overwrite").json("/opt/outputjson/")
spark.read.json("/opt/outputjson/*").show()5. Hive 表
spark 1.6 及以前的版本使用 hive 表需要 hivecontext。Spark2 开始只需要创建 sparksession 增加 enableHiveSupport()即可。
val spark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
spark.sql("select count(*) from student").show()6. JDBC
写入 mysql
wcdf.repartition(1).write.mode("append").option("user", "root")
.option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())从 mysql 里读
val fromMysql = spark.read.option("user", "root")
.option("password", "mdh2018@#").jdbc("jdbc:mysql://localhost:3306/test","alluxio",new Properties())7. 自定义数据源
自定义 source 比较简单,首先我们要看看 source 加载的方式。指定的目录下,定义一个 DefaultSource 类,在类里面实现自定义 source,就可以实现我们的目标。
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport}
class DefaultSource extends DataSourceV2 with ReadSupport {
def createReader(options: DataSourceOptions) = new SimpleDataSourceReader()
}import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class SimpleDataSourceReader extends DataSourceReader {
def readSchema() = StructType(Array(StructField("value", StringType)))
def createDataReaderFactories = {
val factoryList = new java.util.ArrayList[DataReaderFactory[Row]]
factoryList.add(new SimpleDataSourceReaderFactory())
factoryList
}
}import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
class SimpleDataSourceReaderFactory extends
DataReaderFactory[Row] with DataReader[Row] {
def createDataReader = new SimpleDataSourceReaderFactory()
val values = Array("1", "2", "3", "4", "5")
var index = 0
def next = index < values.length
def get = {
val row = Row(values(index))
index = index + 1
row
}
def close() = Unit
}使用
val simpleDf = spark.read
.format("bigdata.spark.SparkSQL.DataSources")
.load()
simpleDf.show()/ 优化器及执行计划 /
1. 流程简介
总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。
简单化成四个部分:
/1 analysis
Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。
/2 logical optimization
常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。
/3 physical planning
eg:SortExec 。
/4 Codegen
codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。
2. 自定义优化器
/1 实现
继承 Rule[LogicalPlan]
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
println("=========> optimization of one applied")
left
}
}
spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)
val multipliedDFWithOptimization = df.selectExpr("amountPaid * 1")
println("after optimization")/2 注册
spark.experimental.extraOptimizations= Seq(MultiplyOptimizationRule)/3 使用
selectExpr("amountPaid* 1")3. 自定义执行计划
/1 物理计划
继承 SparkLan 实现 doExecute 方法。
/2 逻辑计划
继承 SparkStrategy 实现 apply。
case class FastOperator(output: Seq[Attribute],child:SparkPlan) extends SparkPlan {
override def children: Seq[SparkPlan] = Nil
override protected def doExecute(): RDD[InternalRow] = {
val row = org.apache.spark.sql.Row("hi",12L)
val unsafeRow = toUnsafeRow(row, Array(org.apache.spark.sql.types.StringType,org.apache.spark.sql.types.LongType))
sparkContext.parallelize(Seq(unsafeRow),1)
}
def toUnsafeRow(row: org.apache.spark.sql.Row, schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
val converter = unsafeRowConverter(schema)
converter(row)
}
def unsafeRowConverter(schema: Array[org.apache.spark.sql.types.DataType]): org.apache.spark.sql.Row => org.apache.spark.sql.catalyst.expressions.UnsafeRow = {
val converter = org.apache.spark.sql.catalyst.expressions.UnsafeProjection.create(schema)
(row: org.apache.spark.sql.Row) => {
converter(org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToCatalyst(row).asInstanceOf[org.apache.spark.sql.catalyst.InternalRow])
}
}
}
case object NeverPlanned extends LeafNode {
override def output: Seq[Attribute] = Nil
}
object TestStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] =
plan match {
case Project(pblist, child) =>
println("mt fastOperator ------------>")
FastOperator(pblist.map(_.toAttribute),planLater(child)) :: Nil
case Union(children) =>
println("mt union ========>")
UnionExec(children.map(planLater)) :: Nil
case LocalRelation(output, data, _) =>
LocalTableScanExec(output, data):: Nil
case _ => Nil
}
}/3 注册到 Spark 执行策略
spark.experimental.extraStrategies =Seq(countStrategy)/4 使用
spark.sql("select count(*) fromtest")