
大家好,我是老羊。该文耗时将近 3 个月总结。集博主心得理解之大成。
不多说了,本文从盘古开天辟地(状态是啥?)开始说 Flink State。如下为本文目录,诚意满满。
博主期望,你在看完这一小节后,需要明白:状态不仅仅只限于 Flink 的状态。状态其实是一个普遍存在的东西。
首先来看看状态的一个官方的定义:当前计算流程需要依赖到之前计算的结果,那么之前计算的结果就是状态。
但是大家一定要注意,这里所说的状态不仅仅只限于 Flink 的状态。状态其实是一个普遍存在的东西。博主举几个例子:


关于状态的案例非常之多,生活中出处可见状态的影子,博主这里就不一一举例了。
一个小问题! 在去重场景下,我在程序中使用一个 Set存储 id,然后用于去重,算不算状态? 答案:算,只要你的当前数据的处理计算依赖到之前的数据,就算做状态。
其实在实时计算中的状态的功能主要体现在任务可以做到失败重启后没有数据质量、时效问题。
还不明白?我们来对比一下一个离线任务和实时任务的在任务失败重启时候的区别。
那当我们把状态给 "管理" 起来时,上面的两个问题就可以迎刃而解。还是相同的计算任务、相同的业务场景:
当我们把 Set这个数据结构定期(每隔 1min)的给存储到 HDFS 上面时,任务挂了、恢复之后。我们的任务还可以从 HDFS 上面把这个数据给读回来,接着从最新的一个 Kafka Offset 继续计算就可以,这样即没有数据质量问题,也没有数据时效性问题。
所以这就是为什么实时任务中老是提到 状态、状态管理 这些个概念的原因!
离线中其实也有,举个例子 Remote Shuffle Service,比如 Spark Remote Shuffle Service。
一个常见的离线任务运行时,通常都由几个 Stage 组成,比如有 1,2,3,4,5 个 Stage 顺序执行,当第 4 个 Stage 运行挂了之后,离线任务就要从第 1 个 Stage 重新开始执行,这样的话,执行效率是非常低的。
那么这个场景下有没有办法做到第 4 个 Stage 挂了,我们只重新运行第 4 个 Stage 呢?
当然有解法,我们可以将每一个 Stage 的结果保存下来,比如第 3 个 Stage 运行完成之后,将结果保存到远程的服务,当第 4 个 Stage 任务挂了之后,只需要从远程服务将第 3 个 Stage 结果拿来重新执行就行。
而 Remote Shuffle Service 的功能就是将每一个 Stage 的运行结果存储到一个独立的 Service 上面,当第 4 个 Stage fail 之后重新恢复时,可以直接从第 4 个 Stage 开始执行。
那么这里其实也涉及到了状态的概念。对于整个任务来说,这里面的每个 Stage 的结果就是状态,Remote Shuffle Service 就起到了 "管理" 状态 的作用。
也不一定,举个例子,一个消费 Kafka,计算一个分钟级别的同时在线用户数(TUMBLE 1 min)的实时任务,在任务挂了之后,其实可以完全不依赖状态,直接从前几分钟的 Kafka Offset 去回溯一下数据也可以,能满足时效性的同时,也可以满足数据质量。
看完上一小节,相信大家已经知道了实时计算中提到的状态的概念其实重点不止在于状态本身,更重要的在于强调 "管理" 状态。
一个实时任务光有状态是没用的,我们要把这个状态 "管理" 起来,即上节案例中的把 Set定期的存储到远程 HDFS 上,离线任务将中间结果保存到 Remote Shuffle Service 上。只有这样才能在任务 failover 后将状态恢复,保障数据质量、时效。
而在 Flink 中状态管理的模块就是我们所熟知的 Flink Checkpoint\Savepoint。
经过上面的一些基础概念的陈述,终于进入了 Flink 的世界。
博主自己在初学 Flink 时,也会被这些概念搞混,经过博主的整理之后认为,在 Flink 中关于状态、状态管理主要是有 3 个概念,能把这 3 个概念能分清楚,你就已经超越 95% 的实时数据开发同学了:
当我们了解了这 3 个概念之后,继续往下看实际我们怎么使用 Flink 状态。
Flink 中的状态分类有两大类,我们可以在很多博客文章上面看到:Managed State 和 Raw State。
但是实际上生产开发中基本只会用到 Managed State,不会用到 Raw State。至少对于博主来说是这样的。所以本节我们就只介绍 Managed State。
对 Managed State 细分,它又有两种类型:operator-state 和 keyed-state。这里先对比两种状态,后续将展示具体的使用方法。



10b. BroadcastState:每个 sub-task 的广播状态都一样 c. UnionListState:将原来所有元素合并,合并后的数据每个算子都有一份全量状态数据



private static class UserDefinedSource extends RichParallelSourceFunction<Item>
implements CheckpointedFunction {
private final ListStateDescriptor<Item> listStateDescriptor =
new ListStateDescriptor<Item>("a", Item.class);
private volatile boolean isCancel = false;
private transient ListState<Item> l;
@Override
public void run(SourceContext<Item> ctx) throws Exception {
int i = 0;
while (!this.isCancel) {
ctx.collect(
Item.builder()
.name("item" + i)
.color(Color.RED)
.shape(Shape.CIRCLE)
.build()
);
i++;
List<Item> items = (List<Item>) l.get();
items.add(Item.builder()
.name("item")
.color(Color.RED)
.shape(Shape.CIRCLE)
.build());
Thread.sleep(1);
}
}
@Override
public void cancel() {
this.isCancel = true;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 做快照逻辑
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 获取 operator-state
this.l = context.getOperatorStateStore().getListState(listStateDescriptor);
}
}
这里包括博主在内的很多小伙伴都会问一个问题,initializeState 方法我看懂了是用于恢复 state 的,snapshotState 应该写啥逻辑呢?
答案:其实这个问题的核心点在于大家认为 Flink 不是自己持久化 State 吗?为啥要我去实现 snapshotState 逻辑,其实就算我们不写 snapshotState 方法也可以,Flink 会自动把上面的 ListState<Item> l 持久化,snapshotState 是给小伙伴们实现特殊逻辑使用的,举例:在做 cp 时,可以从 ListState<Item> l 删除一些不要的数据,添加一些特殊的数据。
new KeyedProcessFunction<Integer, Item, String>() {
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"itemsMap",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
private final ListStateDescriptor<Item> listStateDesc =
new ListStateDescriptor<>(
"itemsList",
Item.class);
private final ValueStateDescriptor<Item> valueStateDesc =
new ValueStateDescriptor<>(
"itemsValue"
, Item.class);
private final ReducingStateDescriptor<String> reducingStateDesc =
new ReducingStateDescriptor<>(
"itemsReducing"
, new ReduceFunction<String>() {
@Override
public String reduce(String value1, String value2) throws Exception {
return value1 + value2;
}
}, String.class);
private final AggregatingStateDescriptor<Item, String, String> aggregatingStateDesc =
new AggregatingStateDescriptor<Item, String, String>("itemsAgg",
new AggregateFunction<Item, String, String>() {
@Override
public String createAccumulator() {
return "";
}
@Override
public String add(Item value, String accumulator) {
return accumulator + value.name;
}
@Override
public String getResult(String accumulator) {
return accumulator;
}
@Override
public String merge(String a, String b) {
return null;
}
}, String.class);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
mapStateDesc.enableTimeToLive(StateTtlConfig
.newBuilder(Time.milliseconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(10)
.build());
}
@Override
public void processElement(Item value, Context ctx, Collector<String> out) throws Exception {
MapState<String, List<Item>> mapState = getRuntimeContext().getMapState(mapStateDesc);
List<Item> l = mapState.get(value.name);
if (null == l) {
l = new LinkedList<>();
}
l.add(value);
mapState.put(value.name, l);
ListState<Item> listState = getRuntimeContext().getListState(listStateDesc);
listState.add(value);
Object o = listState.get();
ValueState<Item> valueState = getRuntimeContext().getState(valueStateDesc);
valueState.update(value);
Item i = valueState.value();
AggregatingState<Item, String> aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDesc);
aggregatingState.add(value);
String aggResult = aggregatingState.get();
ReducingState<String> reducingState = getRuntimeContext().getReducingState(reducingStateDesc);
reducingState.add(value.name);
String reducingResult = reducingState.get();
System.out.println(1);
}
}
其中每一种 State 的用处:
注意: 大多数情况下,常用的 State 也就是 keyed-state 中的 ValueState、MapState,其他 State 接口其实非常少用(包括 operator-state 也很少用)。
Flink 提供了 3 种状态后端用于管理和存储状态数据,我们来看看每种状态后端的适用场景:



到生产环境中:
Flink 对状态做了能力扩展,即 TTL。它的能力其实和 redis 的过期策略类似,举例:
那么首先我们看下什么场景需要用到 TTL 机制呢?举例:
比如计算 DAU 使用 Flink MapState 进行去重,到第二天的时候,第一天的 MapState 就可以删除了,就可以用 Flink State TTL 进行自动删除(当然你也可以通过代码逻辑进行手动删除)。
其实在 Flink DataStream API 中,TTL 功能还是比较少用的。Flink State TTL 在 Flink SQL 中是被大规模应用的,几乎除了窗口类、ETL(DWD 明细处理任务)类的任务之外,SQL 任务基本都会用到 State TTL。
那么我们在要怎么开启 TTL 呢?这里分 DataStream API 和 SQL API:
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"itemsMap",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 使用 StateTtlConfig 开启 State TTL
mapStateDesc.enableTimeToLive(StateTtlConfig
.newBuilder(Time.milliseconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupInRocksdbCompactFilter(10)
.build());
}
关于 StateTtlConfig 的每个配置项的功能如下图所示:

1
StreamTableEnvironment
.getConfig()
.getConfiguration()
.setString("table.exec.state.ttl", "180 s");
注意:SQL 中 TTL 的策略不如 DataStream 那么多,SQL 中 TTL 只支持下图所示策略:

6
首先我们来想想,要做到 TTL 的话,要具备什么条件呢?
想想 Redis 的 TTL 设置,如果我们要设置 TTL 则必然需要给一条数据给一个时间戳,只有这样才能判断这条数据是否过期了。
在 Flink 中设置 State TTL,就会有这样一个时间戳,具体实现时,Flink 会把时间戳字段和具体数据字段存储作为同级存储到 State 中。
举个例子,我要将一个 String 存储到 State 中时:
接下来以 FileSystem 状态后端下的 MapState 作为案例来说:

2

3
注意: 任务设置了 State TTL 和不设置 State TTL 的状态是不兼容的。这里大家在使用时一定要注意。防止出现任务从 Checkpoint 恢复不了的情况。但是你可以去修改 TTL 时长,因为修改时长并不会改变 State 存储结构。
了解了基础数据结构之后,我们再来看看 Flink 提供的 State 过期的 4 种删除策略:
访问 State 的时候根据时间戳判断是否过期,如果过期则主动删除 State 数据。以 MapState 为例,如下图所示,在 MapState.get(key) 时会进行判断是否过期:
这个删除策略是不需要用户进行配置的,只要你打开了 State TTL 功能,就会默认执行。

4
从状态恢复(checkpoint、savepoint)的时候采取做过期删除,但是不支持 rocksdb 增量 checkpoint。
StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build()
访问 state 的时候,主动去遍历一些 state 数据判断是否过期,如果过期则主动删除 State 数据。
StateTtlConfig
.newBuilder(Time.seconds(1))
// 每访问 1 此 state,遍历 1000 条进行删除
.cleanupIncrementally(1000, true)
.build()

5
注意:
仅仅支持 rocksdb。在 rockdb 做 compaction 的时候遍历进行删除。
StateTtlConfig
.newBuilder(Time.seconds(1))
// 做 compaction 时每隔 3 个 entry,重新更新一下时间戳(这个时间戳是 Flink 用于和数据中的时间戳来比较判断是否过期)
.cleanupInRocksdbCompactFilter(3)
.build()
注意:
rocksdb compaction 时调用 TTL 过滤器会降低 compaction 速度。因为 TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。对于集合型状态类型(比如 ListState 和 MapState),会对集合中每个元素进行检查。
通过上面的部分,我们已经学习了状态、状态后端,最后来看看 Flink Checkpoint 机制。
Checkpoint 整个流程如下:
注意: 实际代码中,慎用 Thread.sleep(),有可能导致任务执行线程卡住,barrier 发不下去,从而导致 Checkpoint 失败。
来看看 Flink 为 Checkpoint 都提供了哪些配置及功能来帮助我们控制 Checkpoint 执行时的行为:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 30 秒触发一次 checkpoint,checkpoint 时间应该远小于(该值 + MinPauseBetweenCheckpoints),否则程序会一直做 checkpoint,影响数据处理速度
env.enableCheckpointing(30000);
// Flink 框架内保证 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个 checkpoints 之间最少有 30s 间隔(上一个 checkpoint 完成到下一个 checkpoint 开始,默认为 0,这里建议设置为非 0 值)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
// checkpoint 超时时间(默认 600 s)
env.getCheckpointConfig().setCheckpointTimeout(600000);
// 同时只有一个checkpoint运行(默认)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 取消作业时是否保留 checkpoint (默认不保留,非常建议配置为保留)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// checkpoint 失败时 task 是否失败(默认 true, checkpoint 失败时,task 会失败)
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
// 对 FsStateBackend 刷出去的文件进行文件压缩,减小 checkpoint 体积
env.getConfig().setUseSnapshotCompression(true);
博主参考了很多云厂商后,建议大家的 Checkpoint 配置如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 120 秒触发一次 checkpoint,不会特别频繁
env.enableCheckpointing(120000);
// Flink 框架内保证 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 两个 checkpoints 之间最少有 120s 间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(120000);
// checkpoint 超时时间 600s
env.getCheckpointConfig().setCheckpointTimeout(600000);
// 同时只有一个 checkpoint 运行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 取消作业时保留 checkpoint,因为有时候任务 savepoint 可能不可用,这时我们就可以直接从 checkpoint 重启任务
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// checkpoint 失败时 task 不失败,因为可能会有偶尔的写入 HDFS 失败,但是这并不会影响我们任务的运行
// 偶尔的由于网络抖动 checkpoint 失败可以接受,但是如果经常失败就要定位具体的问题!
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
这里也分 keyed-state 和 operator-state 进行说明。Flink 会将 Checkpoint 数据存储在一个带有编号的 chk 目录中。
比如说一个 Flink 任务的 keyed-state 的 subTask 个数是 10,operator-state 对应的 subTask 也是 10,那么 chk 会存一个元数据文件 _metadata,10 个 keyed-state 文件,10 个 operator-state 的文件。

7
这里主要介绍两种类型 State 在并行度发生变化时的恢复机制,如下图所示:

11

6
其实 Flink SQL 发明出来就是为了屏蔽窗口、状态这些底层的东西的。
但是我们在使用 Flink SQL 时,70% 以上的场景都是不得不去关注 State 的!
举个 Flink SQL 的例子,下这个 SQL 用于计算每个 sessionId 的点击量:
SELECT
sessionId
, COUNT(*)
FROM clicks
GROUP BY
sessionId;
当 sessionId 为 1 亿时,或许还能够正常运行,但是 sessionId 为 10 亿时,State 将会变得很大,我们就不得不考虑是否要设置 State TTL 以防止无限增大的 State。
捞干的讲。
问题:哪些场景的 Flink SQL 会常常去考虑 State TTL 呢?
答案:相信大家通过上面的案例之后也能总结出来了。其实就是 unbounded Flink SQL 常常会考虑到,因为这类 Flink SQL 的 State 只会越变越大,如果没有设置合理的 State TTL 的话,任务可能会由于大 State 导致磁盘压力大,任务卡住。
PartitionableListState 时,会先把 list clear,然后再 add,这样就会把下图中的 items 给 clear 了。你会发现 state 一致为空。
