专栏首页腾讯云流计算Flink 快照分析:定位大状态和数据倾斜的算子
原创

Flink 快照分析:定位大状态和数据倾斜的算子

作业状态越来越大,究竟发生了什么?

在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可以被保存在状态点中,以供后续的崩溃恢复。

Flink 的状态分为 Operator StateKeyed State,而 Keyed State 又可以分为 ValueState、MapState、ListState、AggregatingState、MergingState、ReducingState 等多种类型。此外,这些林林总总的状态又有多种具体的实现(HeapState、RocksDBState 等),状态的存取还需要各种 Serializer 和 Deserializer 的参与,整个链路精妙而又繁杂。

对于普通用户而言,Flink 内部的运行模式就像黑盒,但是状态带来的困扰却是实实在在的,尤其是在使用 SQL 的多表 JOIN 或者 GROUP BY 等语义时,很容易因为状态越来越多,导致频繁的 TaskManager OOM(内存不足),影响线上业务的稳定性,更影响心情 ╮(╯_╰)╭

很多用户面对持续崩溃的作业,以及磁盘上几十上百 GB 的快照文件,自己也随之崩溃了:这么大的状态,究竟里面存了什么?能不能删点内容呢?

快照的类型

Flink 的快照包括 Checkpoint(周期触发)和 Savepoint(用户主动触发)两种,其中 Checkpoint 分为普通 Checkpoint 和外部化(Externalized)Checkpoint。普通 Checkpoint 只能用于本次 JobManager 存活期间的内部恢复;而外部化 Checkpoint 和 Savepoint 可以用于从零开始的冷启动恢复。

对于 Savepoint,以及开启了 外部化特性 的 Checkpoint,Flink 会在快照目录生成一个元数据文件(快照目录中名为 _metadata 的文件),这个文件是我们分析快照时至关重要的线索。

快照的存储格式

我们先从这个元数据(_metadata 文件)入手,看一下它的数据结构:

Flink 快照 _metadata 文件结构

在 Master State 的不定长结构中,也有自己的 Magic Number、数据长度等信息,通常不会有太多数据。

Operator State 是状态的大头,在它的不定长结构中,主要包含了每个 Operator 的 ID(由两个 Long 拼起来组成),以及当前算子的并行度(parallelism)和最大并行度(maximum parallelism),还有子任务(subtask)状态的个数、每个子任务的 index、元数据(是否包含 raw 和 managed 的 Operator State、是否包含 raw 和 managed 的 Keyed State、包含哪些具体的状态、 KeyGroup 范围、偏移量、是否是 Incremental 状态、状态文件的指针 RelativeFileStateHandle 等)。

除了元数据文件以外,还有很多具体的状态文件(RelativeFileStateHandle 指针指向的文件),它们通常是因为尺寸过大而不能直接嵌入 _metadata 文件,只能以独立文件的方式存在的状态。

快照的读取方式

从上文可以看到,解析状态文件并非易事,有很多需要考虑的地方。解铃还须系铃人,我们可以用 Flink 自身来实现状态文件的读取和解析:

最简单的方式,是找到 Flink 恢复快照状态的源码,然后按图索骥查找反序列化 _metadata 文件的类。很快,我们就找到了 org.apache.flink.runtime.checkpoint.Checkpoints#loadCheckpointMetadata 这个静态方法,它可以将给定的数据流反序列化成 Flink 内部的 CheckpointMetadata 对象(即上述文件的内存映射)。

如果只想处理元数据信息,而不涉及到读写具体的状态数据时,可以采用该方法。

2. 封装后的 State Processor API

在新的 Flink 版本中,还包含了封装后的 State Processor API,通过这个 API,我们不仅可以读取具体的状态文件,还可以按需生成状态数据以供新的 Flink 作业使用。

使用 State Processor API 时,由于涉及到具体状态的读写,需要给定 StateBackend 实例,以及具体的 Operator UID 等信息,且是以 DataSet 批处理任务方式执行的,流程相对复杂,本文不再展开描述,后续会有单独的文章介绍其使用方式。

一起实践

我们来尝试使用 Flink 内部 API 来读取状态元数据信息,并统计分析哪些 Operator 的状态占比最大,以及这些 Operator 的各个 Subtask(多个并行度下的子任务)的状态用量。

示例代码非常简单,这里展示一下具体的分析结果:

快照的分析结果(从小状态的算子开始输出)
快照的分析结果(最后是状态最大的算子)

可以看到,元数据文件里的各项信息都被打印输出了,而且显示出了 4421bbc22ac32fa6abe810c70a869c54 这个 Operator 的状态占比最大,达到了 92.31%,且各个 Subtask 的状态量较为平均,都在 1.1G ~ 1.3G 之间,基本不存在数据倾斜的现象。

由于元数据里并不包含这个 Operator 的名字和类型等信息,需要通过查找日志搜索这个 Operator ID。从日志中可以看到是一个 InnerJoin 的算子。

从日志中寻找指定 ID 的算子

进一步细致分析源码可以得到,是 StreamingJoinOperator 这个流式 JOIN 算子的两个 JoinRecordStateView 状态的数据。

StreamingJoinOperator 算子源码中状态存储的定义

从原理上来讲,要想实现两个流的常规 JOIN(无边界的 JOIN),必须永久保留两个流的所有数据备查,且默认没有清理机制(除非设置了下面的 Idle State Retention Time),因此这种 JOIN 在生产环境很容易因为状态过大而发生 OOM。我们建议用户使用 Interval JOIN(时间区间 JOIN)来代替,具体可以参考此篇文档

另外一个在 SQL 环境下容易造成超大状态的算子是无边界的 GROUP BY,但还好 Flink 提供了 Idle State Retention Time 机制,可以配置状态的定期清理逻辑,将这些 GROUP BY 和 JOIN 的过期状态及时清理掉。

参考阅读

https://ververica.cn/developers/introduction-to-state-management-and-fault-tolerance/

https://ververica.cn/developers/state-management/

https://flink.apache.org/feature/2019/09/13/state-processor-api.html

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/joins.html

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 快照分析:定位大状态和数据倾斜的算子

    在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可...

    腾讯QQ大数据
  • Flink性能调优小小总结

    Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大。可以通过监控GC(Garbage Collection),评估内存使用及剩余情况来判...

    王知无-import_bigdata
  • 理解Flink checkpoint

    Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的...

    神秘的寇先森
  • 腾讯基于 Flink 的实时流计算平台演进之路

    大家好,我是来自腾讯大数据团队的杨华(vinoyang),很高兴能够参加这次北京的 QCon,有机会跟大家分享一下腾讯实时流计算平台的演进与这个过程中我们的一些...

    王知无-import_bigdata
  • 腾讯基于Flink的实时流计算平台演进之路

    大家好,我是来自腾讯大数据团队的杨华(vinoyang),很高兴能够参加这次北京的 QCon,有机会跟大家分享一下腾讯实时流计算平台的演进与这个过程中我们的一些...

    王知无-import_bigdata
  • 任务运维和数据指标相关的使用

    建议:一些简单ETL任务,并且源数据流量在一定范围内, tm个数1、全局并行度1、内存1G。

    数栈DTinsight
  • 那些被问懵的Flink面试题

    有没有去面试的时候被问到Flink的面试题你答不上来,为什么那?,菜吗?不是。原因是你接触的面试题太少了,那我今天就根据不同的群体来给大家你分...

    大数据老哥
  • 腾讯实时计算平台Oceanus建设实践

    2019年4月1-2日,Flink Forward 2019 San Francisco会议在旧金山召开。Flink Forward会议邀请了来自Google,...

    腾讯大数据
  • Flink面试通关手册

    2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark ...

    大数据真好玩

扫码关注云+社区

领取腾讯云代金券