首页
学习
活动
专区
圈层
工具
发布
49 篇文章
1
YARN
2
Hadoop前世今生
3
AI分类
4
人工智能综述
5
随机森林
6
【HBase】HBase之what
7
【HBase】HBase之how
8
HBase篇--HBase常用优化
9
Hbase优化
10
flink源码从头分析第一篇之WordCount DataStream操作
11
大数据Flink-Java学习之旅第一篇
12
flink(12)-flink on yarn
13
Flink学习——Flink概述
14
Flink学习笔记:2、Flink介绍
15
Flink学习笔记(2) -- Flink部署
16
Flink入门(一)——Apache Flink介绍
17
Flink1.4 Flink程序剖析
18
Flink SQL 优化实战 - 维表 JOIN 优化
19
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
20
Flink重点难点:维表关联理论和Join实战
21
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
22
详解flink中Look up维表的使用
23
Flink 1.11中对接Hive新特性及如何构建数仓体系
24
Flink 实时计算 - SQL 维表 Join 的实现
25
大数据技术周报第 010 期
26
实时数仓在有赞的实践
27
美团基于 Flink 的实时数仓平台建设新进展
28
基于Flink+Hive构建流批一体准实时数仓
29
实时数仓:基于流计算 Oceanus 实现 MySQL 和 HBase 维表到 ClickHouse 的实时分析
30
当 TiDB 与 Flink 相结合:高效、易用的实时数仓
31
flink维表关联系列之Mysql维表关联:全量加载
32
基于Flink的高可靠实时ETL系统
33
基于 Flink 实现的商品实时推荐系统(附源码)
34
【Flink】基于 Flink 的流式数据实时去重
35
Flink 实战 | 贝壳找房基于Flink的实时平台建设
36
Apache Hudi在华米科技的应用-湖仓一体化改造
38
Flink checkpoint
39
理解Flink checkpoint
40
flink checkpoint配置整理
41
flink checkpoint 源码分析 (二)
42
聊聊flink的checkpoint配置
43
Flink中案例学习--State与CheckPoint
44
Flink源码阅读(一)--Checkpoint触发机制
45
Flink企业级优化全面总结(3万字长文,15张图)
46
Flink高频面试题,附答案解析
47
学习Flink,看这篇就够了
48
【最全的大数据面试系列】Flink面试题大全
49
Flink SQL Client综合实战

Flink checkpoint

Flink checkpoint

Checkpoint是Flink实现容错机制最核心的功能,能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。

  1. Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记
  2. 当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录
  3. 每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐)
  4. 该Operator会将数据记录(Outgoing Records)发射(Emit)出去,作为下游Operator的输入
  5. 最后将Barrier对应Snapshot发射(Emit)出去作为此次Checkpoint的结果数据

开启checkpoint

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(new FsStateBackend("hdfs://ip:8020/flink/flink-checkpoints"))
    val config = env.getCheckpointConfig
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    config.setCheckpointInterval(60000)

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint处理。

上面代码配置了执行Checkpointing的时间间隔为1分钟。

保存多个checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:

代码语言:javascript
复制
state.checkpoints.num-retained: 20

如果希望会退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现。

从checkpoint 恢复

如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点,比如chk-860进行回放,执行如下命令

代码语言:javascript
复制
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
  • 所有的Checkpoint文件都在以Job ID为名称的目录里面
  • 当Job停掉后,重新从某个Checkpoint点(chk-860)进行恢复时,重新生成Job ID
  • Checkpoint编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863

checkpoint的建议

  • Checkpoint 间隔不要太短
    • 过短的间对于底层分布式文件系统而言,会带来很大的压力。
    • Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的checkpoint,可能会影响整体的性能。
  • 合理设置超时时间

Flink savepoint

Savepoint会在Flink Job之外存储自包含(self-contained)结构的Checkpoint,它使用Flink的Checkpointing机制来创建一个非增量的Snapshot,里面包含Streaming程序的状态,并将Checkpoint的数据存储到外部存储系统中

Flink程序中包含两种状态数据:

  • 用户定义的状态(User-defined State)是基于Flink的Transformation函数来创建或者修改得到的状态数据
  • 系统状态(System State),是指作为Operator计算一部分的数据Buffer等状态数据,比如在使用Window Function时,在Window内部缓存Streaming数据记录

Flink提供了API来为程序中每个Operator设置ID,这样可以在后续更新/升级程序的时候,可以在Savepoint数据中基于Operator ID来与对应的状态信息进行匹配,从而实现恢复。

设置Operator ID:

代码语言:javascript
复制
DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

创建Savepoint

创建一个Savepoint,需要指定对应Savepoint目录,有两种方式来指定

  1. 需要配置Savepoint的默认路径,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,设置Savepoint存储目录
代码语言:javascript
复制
state.savepoints.dir: hdfs://namenode01.td.com/flink/flink-savepoints
  1. 手动执行savepoint命令的时候,指定Savepoint存储目录
代码语言:javascript
复制
bin/flink savepoint :jobId [:targetDirectory]

使用默认配置

代码语言:javascript
复制
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

为正在运行的Flink Job指定一个目录存储Savepoint数据

代码语言:javascript
复制
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

从Savepoint恢复

代码语言:javascript
复制
bin/flink run -s :savepointPath [:runArgs]

以上面保存的Savepoint为例,恢复Job运行

代码语言:javascript
复制
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

会启动一个新的Flink Job,ID为cdbae3af1b7441839e7c03bab0d0eefd

Savepoint 目录结构

  • 1bbc5是Flink Job ID字符串前6个字符,后面bd967f90709b是随机生成的字符串
  • _metadata文件包含了Savepoint的元数据信息
  • 其他文件内容都是序列化的状态信息
下一篇
举报
领券