WAL 即 write ahead log(预写日志),是在 1.2 版本中就添加的特性。作用就是,将数据通过日志的方式写到可靠的存储,比如 HDFS、s3,在 driver 或 worker failure 时可以从在可靠存储上的日志文件恢复数据。WAL 在 driver 端和 executor 端都有应用。我们分别来介绍。
用于写日志的对象 writeAheadLogOption: WriteAheadLog
在 StreamingContext 中的 JobScheduler 中的 ReceiverTracker 的 ReceivedBlockTracker 构造函数中被创建,ReceivedBlockTracker 用于管理已接收到的 blocks 信息。需要注意的是,这里只需要启用 checkpoint 就可以创建该 driver 端的 WAL 管理实例,而不需要将 spark.streaming.receiver.writeAheadLog.enable
设置为 true
。
参见:揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
写什么
首选需要明确的是,ReceivedBlockTracker 通过 WAL 写入 log 文件的内容是3种事件(当然,会进行序列化):
spark.streaming.receiver.writeAheadLog.enable
设置为 true
。何时写BlockAdditionEvent
在揭开Spark Streaming神秘面纱② - ReceiverTracker 与数据导入
一文中,已经介绍过当 Receiver 接收到数据后会调用 ReceiverSupervisor#pushAndReportBlock
方法,该方法将 block 数据存储并写一份到日志文件中(即 WAL),之后最终将 block 信息,即 receivedBlockInfo
(包括 streamId、batchId、数据条数)传递给 ReceivedBlockTracker
.
当 ReceivedBlockTracker
接收到 receivedBlockInfo
后,将之封装成 BlockAdditionEvent(receivedBlockInfo)
并写入日志(WAL)。
抛开代码调用逻辑不谈,一句话总结的话,就是当 Receiver 接收数据产生新的 block 时,最终会触发产生并写 BlockAdditionEvent
何时写BatchAllocationEvent
在揭开Spark Streaming神秘面纱③ - 动态生成 job一文中介绍了 JobGenerator 每隔 batch duration 就会为这个 batch 生成对应的 jobs。在生成 jobs 的时候需要为 RDD 提供数据,这个时候就会触发执行
jobScheduler.receiverTracker.allocateBlocksToBatch(time)
该操作将把所有该 streamId 对应的已接收存储但未分配的 blocks 都分配给该 batch,我们知道,ReceivedBlockTracker 保存着所有的 blocks 信息,所以为某个 batch 分配 blocks 这个分配请求最终会给到 ReceivedBlockTracker,ReceivedBlockTracker 在确认要分配哪些 blocks 之后,会将给某个 batchTime 分配了哪些 blocks 的对应关系封装成 BatchAllocationEvent(batchTime, allocatedBlocks)
并写入日志文件(WAL),这之后才进行真正的分配。
何时写BatchCleanupEvent
从我以前写的一些文章中可以知道,一个 batch 对应的是一个 jobSet,因为在一个 batch 可能会有多个 DStream 执行了多次 output 操作,每个 output 操作都将生成一个 job,这些 job 将组成 jobSet。总共有两种时机会触发将 BatchCleanupEvent
事件写入日志(WAL),我们进行依次介绍
我们先来介绍第一种,废话不多说,直接看具体步骤:
ClearMetadata
消息ReceiverTracker#cleanupOldBlocksAndBatches
,具体cleanupOldBlocksAndBatches方法干了什么稍后分析另一种时机如下:
ClearCheckpointData
消息ClearCheckpointData
消息后,调用 clearCheckpointData
方法JobGenerator#ClearCheckpointData
方法中,会调用到 ReceiverTracker#drcleanupOldBlocksAndBatches
从上面的两小段分析我们可以知道,当一个 batch 的 jobSet 中的 jobs 都完成的时候和每次 checkpoint操作完成的时候会触发执行 ReceiverTracker#cleanupOldBlocksAndBatches
方法,该方法里做了什么呢?见下图:
上图描述了以上两个时机下,是如何:
spark.streaming.receiver.writeAheadLog.enable
设置为 true
才会执行这一步)Receiver 接收到的数据会源源不断的传递给 ReceiverSupervisor,是否启用 WAL 机制(即是否将 spark.streaming.receiver.writeAheadLog.enable
设置为 true
)会影响 ReceiverSupervisor 在存储 block 时的行为:
关于什么时候以及如何清理存储在 WAL 中的过期的数据已在上图中说明
关于是否要启用 WAL,要视具体的业务而定: