Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new FsStateBackend("hdfs://ip:8020/flink/flink-checkpoints"))
val config = env.getCheckpointConfig
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
config.setCheckpointInterval(60000)
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。
上面代码配置了执行Checkpointing的时间间隔为1分钟。
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint
Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。
如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中
Flink程序中包含两种状态数据:
Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。
设置Operator ID:
DataStream<String> stream = env.
// Stateful source (e.g. Kafka) with ID
.addSource(new StatefulSource())
.uid("source-id") // ID for the source operator
.shuffle()
// Stateful mapper with ID
.map(new StatefulMapper())
.uid("mapper-id") // ID for the mapper
// Stateless printing sink
.print(); // Auto-generated ID
创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定
state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints
bin/flink savepoint :jobId [:targetDirectory]
使用默认配置
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
为正在运行的Flink Job指定一个目录存储Savepoint数据
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint为例,恢复Job运行
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd