欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)
往期直通车:Hello Spark!
注:本文节选自「酷玩 Spark」开源项目,原文地址:https://github.com/lw-lin/CoolplaySpark
Spark Streaming 是批处理的流式实时计算框架,支持从多种数据源获取数据,如 Kafka、TCP sockets、文件系统等。它可以使用诸如 map、reduce、join 等高级函数进行复杂算法的处理,最后还可以将处理结果存储到文件系统,数据库等。
Spark Streaming 有三个特点:
/ 运行原理 /
图 1
如图 1 所示是 Spark 的整体架构图,它主要分为四个模块:
DAG 静态定义
DAG 静态定义是将整个计算逻辑描述为一个 RDD DAG 的「模版」,在后面 Job 动态生成的时候,针对每个 batch,Spark Streaming 都将根据这个「模板」生成一个 RDD DAG 的实例。
图 2
接下来我们了解下 RDD 和 DStream 的关系。DStream 维护了对每个产出的 RDD 实例的引用,如图 2 所示,DStream 在 3 个 batch 里分别实例化了 3 个 RDD, a[1]、a[2]、a[3],然后 DStream 就保留了 batch 所产出的 RDD 的哈希表。
我们在考虑的时候,可以认为 RDD 加上 batch 维度就是 DStream,DStream 去掉 batch 维度就是 RDD。Spark 定义静态的计算逻辑后,通过动态的工作控制来调度。
Job 动态生成
在 Spark Streaming 程序的入口我们都会定义一个 batchDuration,即每隔固定时间就比照静态的 DStreamGraph 来动态生成一个 RDD DAG 实例。在 Spark Streaming 内整体负责动态作业调度的具体类是 JobScheduler,由 start() 运行。
JobScheduler 有两个非常重要的成员:JobGenerator 和 ReceiverTracker。JobScheduler 将每个 batch 的 RDD DAG 具体生成工作委托给 JobGenerator,而将源头输入数据的记录工作委托给 ReceiverTracker。
JobGenerator 维护了一个定时器,周期就是上文提到的 batchDuration,定时为每个 batch 生成 RDD DAG 的实例,其中每次 RDD DAG 实际生成包含 5 个步骤:
数据产生与导入
DStream 的子类 ReceiverInputDStream 在某个 batch 里实例化 RDD,通过 Receiver 为这个 RDD 生产数据。Spark Streaming 在程序刚开始运行时:
后续在 driver 端,就由 ReceiverInputDStream 在每个 batch 去检查 ReceiverTracker 收到的块数据 meta 信息,界定哪些新数据需要在本 batch 内处理,然后生成相应的 RDD 实例去处理这些块数据。
?举个例子
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
object example{
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")) // DStream transformation
val pairs = words.map(word => (word, 1)) // DStream transformation
val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
wordCounts.print() // DStream output
ssc.start()
ssc.awaitTermination()
}
}
如以上代码所示:
长时容错
首先看 executor 端,在 executor 端 ReceiverSupervisor 和 Receiver 失效后直接重启即可,关键点是保障收到的块数据的安全,保障了源头块数据就能够保障 RDD DAG (Spark Core 的 lineage)重做。
Spark Streaming 对源头块数据的保障,分为 4 个层次,全面、相互补充,又可根据不同场景灵活设置:
上文曾提到块数据的 meta 信息上报到 ReceiverTracker,然后交给 ReceivedBlockTracker 做具体的管理。ReceivedBlockTracker 也采用 WAL 冷备方式进行备份,在 driver 失效后,由新的 ReceivedBlockTracker 读取 WAL 并恢复 block 的 meta 信息。
另外,需要定时对 DStreamGraph 和 JobScheduler 做 Checkpoint,来记录整个 DStreamGraph 的变化、和每个 batch 的 job 的完成情况。
注意到这里采用的是完整 checkpoint 的方式,和之前的 WAL 的方式都不一样。Checkpoint 通常也是落地到可靠存储如 HDFS。Checkpoint 发起的间隔默认的是和 batchDuration 一致;即每次 batch 发起、提交了需要运行的 job 后就做 Checkpoint,另外在 job 完成了更新任务状态的时候再次做一下 Checkpoint。
这样一来,在 driver 失效并恢复后,可以读取最近一次的 Checkpoint 来恢复作业的 DStreamGraph 和 job 的运行及完成状态。
Spark Streaming 窗口操作
/ Structured Streaming /
Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎,它可以以静态数据表示批量计算的方式来表达流式计算。 Spark SQL 引擎将随着 streaming data 持续到达而增量地持续地运行,并更新最终结果。
StreamExecution 的初始状态
值得注意的是,Structured Streaming 也是先纯定义、再触发执行的模式。前面大部分代码是纯定义 Dataset/DataFrame 的产生、变换和写出,后面位置再真正 start 一个新线程去触发执行之前的定义。在新的执行线程里我们需要持续地去发现新数据,进而持续地查询最新计算结果至写出。
这些 DataFrame的产生、变换和写出的信息就对应保存在 StreamExecution非常重要的 3 个成员变量中:
Structured Streaming 持续查询
StreamExecution 通过 Source.getOffset() 获取最新的 offsets,即最新的数据进度,将 offsets 写入到 offsetLog 里,将来可用作故障恢复用。在 3a 将预先定义好的逻辑(即 logicalPlan 成员变量)制作一个副本出来,3b 给定刚刚取到的 offsets,通过 Source.getBatch(offsets) 获取本执行新收到的数据的 DataFrame 表示。经过这两步,构造完成的 LogicalPlan 就是针对本执行新收到的数据的 DataFrame 整个处理逻辑。
接着将表示计算结果的 DataFrame 交给 Sink,6a 通过 Source.commit() 告知 Source 数据已经完整处理结束,6b 将本次执行的批次 id 写入到 batchCommitLog 里。
StreamExecution 增量持续查询
Structured Streaming 在编程模型上暴露给用户的是每次持续查询看做面对全量数据,所以每次执行的结果是针对全量数据进行计算的结果,但是在实际执行过程中,由于全量数据会越攒越多,每次对全量数据进行计算的代价和消耗会越来越大。
因此 Structured Streaming 引入全局范围、高可用的 StateStore 转全量为增量,即在每次执行时先从 StateStore 里 restore 出上次执行后的状态,再加入本执行的新数据进行计算,如果有状态改变,将把改变的状态重新 save 到 StateStore 里。
所以 Structured Streaming 在具体实现上转换为增量的持续查询。
故障恢复
由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,因此只讨论 driver 故障恢复。如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution 读取 WAL offsetlog 恢复出最新的 offsets ,并读取 batchCommitLog 决定是否需要重做最近一个批次。
事件时间
当我们有一系列到达的记录时,首先对时间列 timestamp 做长度为10m,滑动为5m 的 window() 操作。
如图右上角的虚框部分,当达到一条记录 12:22|dog 时,会将 12:22 归入两个窗口 12:15-12:25、12:20-12:30,所以产生两条记录:12:15-12:25|dog、12:20-12:30|dog,所以这里 window() 操作的本质是 explode(),可由一条数据产生多条数据。
接着对 window() 操作的结果,以 window 列和 word 列为 key,做 groupBy() 操作。这个操作的聚合过程是增量的最后得到一个有 window、 word、count 三列的状态集。
val windowedCounts = words
.withWatermark("timestamp", "10 minutes") // 注意这里的 watermark 设置!
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
对于数据延迟通过 withWatermark("timestamp", "10 minutes") 告诉 Structured Streaming,以 timestamp 列的最大值为锚点,往前推 10min 以前的数据不会再接收。
接下来看 structured streaming 的输出模式,complete 输出模式如同上面的流程,接着主要讲另外两种输出模式:append 和 update。
Append 的语义将保证一旦输出了某条 key,未来就不会再输出同一个 key。所以,在上图 12:10 这个批次直接输出 12:00-12:10|cat|1, 12:05-12:15|cat|1 将是错误的,因为在 12:20 将结果更新为了 12:00-12:10|cat|2,但是 Append 模式下却不会再次输出 12:00-12:10|cat|2,因为前面输出过了同一条 key 12:00-12:10|cat 的结果12:00-12:10|cat|1。
为了解决这个问题,在 Append 模式下 Structured Streaming 需要知道某一条 key 的结果什么时候不会再更新了,当确认结果不会再更新的时候就可以将结果进行输出。
如上图所示,如果我们确定 12:30 这个批次以后不会再有对 12:00-12:10 这个 window 的更新,那么我们就可以把 12:00-12:10 的结果在 12:30 这个批次输出,并且也会保证后面的批次不会再输出 12:00-12:10 的 window 的结果,维护了 Append 模式的语义。
Update 模式是在 Spark 2.1.1 及以后版本获得正式支持。
如上图所示,在 Update 模式中,只有本执行批次 State 中被更新了的条目会被输出: