前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从一个诡异的 Bug 来看 Flink 快照和状态读取的流程

从一个诡异的 Bug 来看 Flink 快照和状态读取的流程

原创
作者头像
KyleMeow
修改2021-09-29 20:52:15
3K0
修改2021-09-29 20:52:15
举报

问题概要

流计算 Oceanus 平台支持以 SQL 的方式提交作业,独享集群支持最新的 Flink 1.10 提供的新版 Blink Planner 语法。有一位客户写了一段代码,用到了 SQL 的 TopN 功能,语句类似于:

INSERT INTO `MySink` 
SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

作业提交后,程序运行时一切正常;但是一旦把作业暂停(做快照),然后恢复时,就会持续报错:

java.lang.RuntimeException: Error while getting state
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 13 more

从报错信息可以看出,作业的崩溃是由于 State Serializer 不兼容导致的。那么问题来了,跑的好好的程序,用的都是官方提供的 API,没有任何自定义的代码,为什么会不兼容呢?

报错初探

既然看到了报错时的线程栈,定位问题就可以以此入手。我们首先看下报错的直接根源:updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)

updateRestoredStateMetaInfo 方法的兼容性判断逻辑
updateRestoredStateMetaInfo 方法的兼容性判断逻辑

从代码里了解到,是 updateStateSerializer() 方法返回了“不兼容”的结果,导致报错的。我们继续按图索骥,追踪到了 StateSerializerProvider 类的 registerNewSerializerForRestoredState() 方法,并确认了不兼容的信号来源于 previousSerializerSnapshot 对象的 resolveSchemaCompatibility() 方法返回的结果。

registerNewSerializerForRestoredState 方法中兼容性的判断逻辑
registerNewSerializerForRestoredState 方法中兼容性的判断逻辑

追踪到这里,我们发现 previousSerializerSnapshot 是一个接口的引用,下面有几十种不同实现。怎么办呢?我们在测试环境复现,并进行远程调试来查看运行时到底发生了什么。

运行时调试

Java 的远程调试方法很简单,只需要在 java 命令的启动参数上加入

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=[调试端口]

的参数即可。

对于 Flink 而言,可以修改 flink-conf.yaml 里面的 env.java.opts.taskmanager 和 env.java.opts.jobmanager 两个配置项,分别对应着 TaskManager 和 JobManager 的运行参数。

因为报错发生在任务运行时,我们首先对 TaskManager 进行调试。果不其然,作业快照并恢复后,又开始报错了。我们也通过对上述代码设置断点,找到了现场:

远程调试 查看 resolveSchemaCompatibility 的具体类
远程调试 查看 resolveSchemaCompatibility 的具体类

原来是出问题的 previousSerializerSnapshot 对象的类型是 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot 这个类。我们仔细阅读它的 resolveSchemaCompatibility() 方法,可以看到它的核心逻辑,是通过使用 equals 方法,对比它自己的 comparator 和 传入的 newSortedMapSerializer 的 comparator,如果 equals 方法返回 true 就是兼容,反之就不一定兼容(也可能通过 state migration 就兼容了,此文不涉及)。

初步找出问题原因
初步找出问题原因

同时在运行时,我们看到这里的 Comparator 对于 TopN 查询来说,实际上是 RetractableTopNFunction$ComparatorWrapper 内部类。

远程调试找出 Comparator 的具体类
远程调试找出 Comparator 的具体类

那么问题就清楚了,大概率是这个类的 equals 方法实现不规范(测试不充分),导致出现了此问题。

问题确认

我们看下 ComparatorWrapper 类的 equals 方法的实现:

ComparatorWrapper 类的 equals 方法的实现
ComparatorWrapper 类的 equals 方法的实现

从代码里可以看到,它是对比本方法和传入对象的类名、生成的代码、References(附加参数)来判断两个 Comparator 是不是相等(兼容)。

但是,从调试中我们可以看到,类名、生成的代码其实有微小差别(后缀数字不一样)。

调试中发现的对比逻辑 Bug
调试中发现的对比逻辑 Bug

这里的后缀其实没有特别含义,是 Flink 在生成 Java 代码时,为了避免类、变量冲突而维护的一个自增变量,只与生成顺序有关,与代码逻辑无关。

因此问题就很清楚了:Flink 在判断 TopN 状态的序列化器是否兼容的时候,采用了不合适的对比方法,造成逻辑相同但是生成顺序略有差异的两个 Comparator 被误判为不等(不兼容)。

那么问题解决方法很简单:将这两个逻辑相同的类实例判断为相同即可。

可是问题来了:这两个类实例各自是如何来的呢?又是做什么的呢?为什么需要判断它们两个是否相同呢?

Flink 快照分析

为了回答这些问题,我们先来梳理一下 Flink 快照的流程:

1. 算子收到 Checkpoint Barrier(非数据源算子从上游算子传递得到,数据源算子则是被 Checkpoint Coordinator 推送),处理并准备进行快照(org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#processBarrier 方法)。

2. 通过 org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#notifyCheckpoint 方法,对收到的 Barrier 进行提取元数据和统计,然后触发 Checkpoint 快照(org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#triggerCheckpointOnBarrier 方法)。

3. 调用 org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint 方法,对该算子的状态进行完整快照。它还需要调用 checkpointState、checkpointStreamOperator、snapshotState、snapshot、doSnapshot、computeSnapshot 等方法,对状态进行快照。

4. 在 computeSnapshot 方法里,会对 namespaceSerializer 和 stateSerializer 调用 snapshotConfiguration 方法,对当前的配置做快照,如下图:

computeSnapshot 方法里保存 Serializer 的快照
computeSnapshot 方法里保存 Serializer 的快照

5. 之后对于这个问题,它调用的是前面介绍过的 SortedMapSerializerSnapshot 类的 writeSnapshot 方法进行快照,本质是将 comparator 序列化并写入快照文件里。

序列化和反序列化 comparator
序列化和反序列化 comparator

从 readSnapshot 方法也可以看到,前面介绍过的 SortedMapSerializerSnapshot 类的 comparator 对象就是反序列化状态文件得到的,而这个 comparator 对象就被用作前述 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot#resolveSchemaCompatibility 方法的 equals 左边的 comparator(下图红框)。

问题根源基本确认
问题根源基本确认

那前面的疑问就解决了一半。下面我们继续来查看右边那个 comparator 是怎么来的,又是做什么的。

SQL 作业提交流程

为了回答右边的 comparator 是怎么来的,我们需要看下客户端(Client)的 Flink SQL 作业的提交流程:

1. 当用户写了一个 “INSERT INTO” 语句后,需要调用 org.apache.flink.table.api.TableEnvironment#sqlUpdate 方法,令 Flink 将其解析为语法树(org.apache.flink.table.planner.StreamPlanner#translate)。

2. 在解析时,还会调用 ExecNode#translateToPlan、StreamExecCalc#translateToPlanInternal 等一系列方法,进一步解析语法树的每个节点,将其翻译成逻辑计划。

3. 在生成逻辑计划时,还涉及到一个名为“代码生成”的步骤,即将 SQL -> 语法树时,要把逻辑用 Java 代码表达出来,然后通过内置的 Janino 轻量级编译器,在内存中编译为 class 实例并序列化以作为计划的一部分。在代码生成过程中,类名和变量名都是自增生成的,这也是为什么之前我们截图里两个 comparator 的逻辑一样,类名和代码中的变量名类似但不一致的原因。

4. 对于这个场景,Comparator 的代码是 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 生成的,生成以后就会返回一个 GeneratedRecordComparator 对象。

5. GeneratedRecordComparator 对象会被传入 RetractableTopNFunction 的构造方法,并被前述的 ComparatorWrapper 进行包装,以支持被序列化。

RetractableTopNFunction 的构造方法里需要传入生成的 GeneratedRecordComparator 对象
RetractableTopNFunction 的构造方法里需要传入生成的 GeneratedRecordComparator 对象

此时,Flink 作业运行图生成就告一段落。当运行图提交到 Flink 集群进行运行时,RetractableTopNFunction 类的 open 方法中会对状态进行初始化,其中 ValueStateDescriptor 就是访问状态的“钥匙”,它的构造需要传入上述生成的 GeneratedRecordComparator 对象:

RetractableTopNFunction 的运行时初始化方法里要用到生成的 Comparator
RetractableTopNFunction 的运行时初始化方法里要用到生成的 Comparator

报错回溯

让我们再返回文章开头的报错信息,一切明了:

1. 当恢复后的新 Flink 作业希望读取状态时,通过 getState 方法尝试从这个 ValueStateDescriptor 获取状态。

2. 首次访问时,由于这里用了延迟初始化(Lazy Initialization)机制,会检查这个 ValueStateDescriptor 里面封装的 comparator(新作业代码生成的)与快照恢复时里面记录的 comparator 是否兼容,如果兼容才会进行状态读取。

3. 但是很不巧,由于 equals 方法写的有问题,导致对比两个 comparator 时,因为生成的类名不一样,代码里变量也不一样,直接返回了 false,让 Flink 误认为不兼容,所以拒绝继续,作业报错。

解决思路

既然我们知道了问题的根本原因,可以从多方面入手解决这个问题:

1. 最简单的方法是通过正则的方式,把随机变量名去掉,然后对比类名和代码里剩下的字符串。但是这种方法不稳定,跨版本时如果 Generator 的代码格式稍有修改,就会出问题。

2. 更好的方法,是对比代码生成时,传入的元数据(即 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 方法的参数)。如果不需要考虑历史作业的兼容性,可以直接修改 GeneratedRecordComparator 类,加入这些元数据,并在后续的 equals 对比时直接对比这些元数据即可。

另外,这个问题已经反馈给社区并记录为 JIRA 单。由于社区代码提交需要做完整的考量,这里我们还在讨论阶段。初步想法是采用方法 2,但是对于无法处理的历史类,则回退到类似方法 1 的途径。

总而言之,这个问题解决起来不难,但是需要充分注意兼容性和正确性,以避免社区前段时间的一个小修改造成的不兼容的问题

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题概要
  • 报错初探
  • 运行时调试
  • 问题确认
  • Flink 快照分析
  • SQL 作业提交流程
  • 报错回溯
  • 解决思路
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档