Apache Flink
提供了一个容错机制来持续恢复数据流应用程序的状态。该机制确保即使在出现故障的情况下,程序的状态也将最终反映每条记录来自数据流严格一次exactly once
。 请注意,有一个开关可以降级为保证至少一次(least once
)(如下所述)。
容错机制连续生成分布式流数据流的快照。对于状态较小的流式应用程序,这些快照非常轻量级,可以频繁生成,而不会对性能造成太大影响。流应用程序的状态存储在可配置的位置(例如主节点或HDFS
)。
如果应用程序发生故障(由于机器,网络或软件故障),Flink
会停止分布式流式数据流。然后系统重新启动算子并将其重置为最新的成功检查点。输入流被重置为状态快照的时间点。作为重新启动的并行数据流处理的任何记录都保证不属于先前检查点状态的一部分。
注意:默认情况下,检查点被禁用。有关如何启用和配置检查点的详细信息,请参阅检查点。
为了实现这个机制的保证,数据流源(如消息队列或代理)需要能够将流重放到定义的最近时间点。Apache Kafka
有这个能力,而Flink
的Kafka连接器就是利用这个能力。有关Flink
连接器提供的保证的更多信息,请参阅数据源和接收器的容错保证。
因为Flink
的检查点是通过分布式快照实现的,所以我们交替使用快照
和检查点
两个概念。
Flink
的容错机制的核心部分是生成分布式数据流和算子状态的一致性快照。这些快照作为一个一致性检查点,在系统发生故障时可以回溯。Flink
的生成这些快照的机制在分布式数据流的轻量级异步快照中进行详细的描述。它受分布式快照Chandy-Lamport
算法的启发,并且专门针对Flink
的执行模型量身定制。
Flink
分布式快照的一个核心元素是数据流Barriers
。这些Barriers
被放入数据流中,并作为数据流的一部分与记录一起流动。Barriers
永远不会超越记录,严格按照相对顺序流动。Barriers
将数据流中的记录分成进入当前快照的记录集合和进入下一个快照的记录集合。每个Barriers
都携带前面快照的ID。Barriers
不会中断流的流动,因此非常轻。来自不同快照的多个Barriers
可以同时在流中,这意味着不同快照可以同时发生。
Barriers
在数据流源处被放入的并行数据流。快照n
放入Barriers
的位置(我们称之为Sn
)是快照覆盖数据的源流中的位置。例如,在Apache Kafka
中,这个位置是分区中最后一个记录的偏移量。该位置Sn
会报告给检查点协调员(Flink
的JobManager
)。
Barriers
向下游流动。当中间算子从其所有输入流中接收到快照n
的Barriers
时,它会将快照n
的Barriers
发送到其所有输出流中。一旦Sink
算子(流式DAG
的末尾)从其所有输入流中接收到Barriers n
,就向检查点协调器确认快照n
。在所有Sink
确认了快照之后,才被确认已经完成。
一旦快照n
完成,作业将不会再向数据源询问Sn
之前的记录,因为那时这些记录(以及它们的后代记录)已经通过了整个数据流拓扑。
接收多个输入流的算子需要根据快照Barriers
对其输入流。上图说明了这一点:
Barriers n
时,先不处理来自该数据流的记录,而是先进行缓存,等从其他所有输入流中都接收到Barriers n
时,才开始处理缓存的数据(译者注:根据 Barriers n
对齐所有的输入流)。否则,就会把属于快照n
和快照n + 1
的记录混合在一起。Barriers n
的数据流暂时搁置。从这些数据流接收到的记录不会被处理,而是放入输入缓冲区中,等待其他输入数据流进行对齐(例如上图中的aligning
部分)。Barriers n
时(译者注:这是触发Checkpoint),算子才发送所有缓存的记录,然后发送快照Barriers n
(例如上图中的checkpoint
部分)。continue
部分)。当算子包含任何形式的状态时,这个状态也必须是快照的一部分。算子状态有不同的形式:
map()
或filter()
)直接创建和修改的状态。有关详细信息,请参阅状态概述在算子收到所有输入流中的Barriers
以及在barriers
发送到输出流之前,算子对其状态进行快照。这时,Barriers
之前的记录都更新到状态中,Barriers
之后的记录不会进行更新。由于快照的状态可能较大,因此需要存储在可配置的状态后端state backend
中。默认情况下,会存储在JobManager
的内存中,但是在生产环境下,应该配置为分布式可靠存储系统(如HDFS
)。在状态被存储之后,算子确认检查点,将快照barriers
发送到输出流,然后继续进行。
生成的快照包含:
对齐步骤可能会给流处理程序造成延迟。这个额外的延迟通常大约在几毫秒的数量级,但是我们已经看到一些因为异常值造成的延迟明显增加的情况。对于需要连续较低延迟(几毫秒)的应用程序而言,Flink
有一个开关可以在检查点期间跳过流对齐。一旦算子看到每个输入的检查点Barriers
,就会生成检查点快照。
当跳过对齐步骤时,当检查点n
的某些barriers
到达时,算子就会处理输入数据(译者注:不需要缓存输入数据来等待最后一个 Barriers
的到来)。这样的话,在为检查点n
生成状态快照之前也会处理到属于检查点n+1
的元素。在恢复时,这些记录将会重复出现,因为它们既包含在检查点n
的状态快照中,也会在检查点n
之后作为数据的一部分进行重放。
对齐仅发生在当算子具有多个输入(例如
join
)或者具有多个输出(在流repartitioning
/shuffle
之后)的情况。正因为如此,只有高度并行流操作(map()
,flatMap()
,filter()
…)的数据流即使在 At-Least-Once 模式下也只能提供Exactly-Once语义。
请注意,上述机制意味着当算子在状态后端存储状态快照时会停止处理输入记录。这种同步状态快照在每次生成快照时都会造成延迟。
可以让算子在存储其状态快照的同时继续处理输入记录,有效地让状态快照在后台异步发生。要做到这一点,算子必须能够产生一个状态对象,以某种方式进行存储以便对算子状态进行修改后不会影响该状态对象。例如,copy-on-write
数据结构(如RocksDB
中使用的数据结构)具有这种功能。
在接收到输入端的Barriers
后,算子启动其状态的异步快照复制。Barriers
立即发送到输出流中,并继续进行正常的流处理。一旦后台复制过程完成,它就会向检查点协调器(JobManager)确认检查点。只有在所有sink
接收到Barriers
并且所有有状态的算子已经确认完成备份(可能在Barriers
到达sink
之后)时检查点才算完成。
有关状态快照的详细信息,请参阅状状态后端。
在这种机制下恢复很简单:一旦失败,Flink
选择最近完成的检查点k
。然后系统重新部署整个分布式数据流,并为每个算子提供作状态。数据源被设置为从位置Sk
读取数据流。例如在Apache Kafka
中,这意味着告诉消费者从偏移量Sk
处开始提取数据。
如果增量对状态进行快照,算子将从最新且完整的快照状态开始,然后对该状态应用一系列增量快照更新。
请参阅重启策略了解更多信息。
对算子进行快照,有两部分:同步部分和异步部分。
算子和状态后端将其快照作为Java FutureTask
。该任务包含的状态同步部分已经完成异步部分挂起。然后异步部分由该检查点的后台线程执行。
算子检查点只是同步返回一个已经完成的FutureTask
。如果需要执行异步操作,则在FutureTask的run()
方法中执行。
任务是可取消的,所以消耗句柄的数据流和其他资源是可以被释放。
Flink版本:1.4
原文:https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/stream_checkpointing.html