流式计算任务通常需要 7x24 小时长期运行,面对网络抖动、机器故障或代码 Bug,如何保证任务不挂?或者挂了之后能自动恢复且数据不丢、不重?这正是 Flink 引以为傲的资本:强大的状态管理与基于 Checkpoint 的容错机制。
本文将带你深入理解 Flink 是如何“记忆”数据的,以及它是如何在故障发生时“时光倒流”恢复现场的。
在流计算中,数据是一条条流过的。如果处理一条数据时,需要依赖之前的数据(例如:计算过去一小时的总和、去重、模式匹配),那么这些“之前的数据”或“中间计算结果”就是状态。
Flink 的状态分为两大类:Managed State(托管状态) 和 Raw State(原生状态)。我们日常开发 99% 使用的是托管状态,由 Flink 运行时自动管理内存、序列化和故障恢复。
Managed State 又细分为:
KeyedStream(即 keyBy 之后)上使用。ValueState、ListState、MapState、ReducingState、AggregatingState。ListState、UnionListState、BroadcastState。状态存在哪里?是内存还是磁盘?这由 State Backend 决定。在 Flink 1.13 之后,配置方式简化为以下两种主要模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置状态后端为 RocksDB
env.setStateBackend(new EmbeddedRocksDBStateBackend());
// 配合 Checkpoint 存储路径(存储在本地文件系统)
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/checkpoints");Checkpoint(检查点)是 Flink 容错机制的灵魂。它是一个全局一致性快照,定期将所有算子的状态持久化到远程存储(如 HDFS)。
Flink 使用 Chandy-Lamport 算法 的变体。
默认情况下 Checkpoint 是关闭的,生产环境必须开启。
// 1. 开启 Checkpoint,每 5000ms 触发一次
env.enableCheckpointing(5000);
// 2. 设置 Checkpoint 模式(默认 EXACTLY_ONCE,也可以设为 AT_LEAST_ONCE)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 3. 设置两次 Checkpoint 之间的最小间隔(防止频繁 Checkpoint 导致性能下降)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// 4. Checkpoint 超时时间(默认 10分钟)
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 5. 允许同时进行的 Checkpoint 数量(通常设为 1)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 6. 开启作业取消时保留 Checkpoint(非常重要!否则 Cancel 任务会删除 Checkpoint)
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
);
// 7. 容忍 Checkpoint 失败次数(默认 0,即 Checkpoint 失败会导致任务重启)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);虽然 Checkpoint 和 Savepoint 看起来很像(都是快照),但它们的定位完全不同:
特性 | Checkpoint | Savepoint |
|---|---|---|
触发方式 | Flink 定时自动触发 | 用户手动命令触发 |
主要目的 | 故障恢复(Failover) | 运维操作(升级、扩容、迁移) |
存储格式 | 增量存储(依赖 StateBackend 优化) | 标准格式,全量存储(可跨版本) |
生命周期 | 随作业生命周期管理(除非设置保留) | 用户自行管理(删除需手动) |
# 触发 Savepoint
bin/flink savepoint <jobId> [targetDirectory]
# 从 Savepoint 重启作业 (或者 Checkpoint)
bin/flink run -s <savepointPath> ...当任务发生故障(Exception)时,Flink 会尝试根据配置的策略自动重启。
// 1. 固定延迟重启(尝试 3 次,每次间隔 10秒)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
Duration.ofSeconds(10)
));
// 2. 失败率重启(在 5 分钟内失败超过 3 次则停止,否则每次间隔 10秒重启)
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Duration.ofMinutes(5),
Duration.ofSeconds(10)
));
// 3. 无重启(直接失败)
env.setRestartStrategy(RestartStrategies.noRestart());掌握了状态与容错,你的 Flink 任务才算真正具备了“生产级”的健壮性。下一篇,我们将探讨 Flink SQL,看看如何用 SQL 解决 80% 的流计算需求。
原文来自:http://blog.daimajiangxin.com.cn
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。