专栏首页腾讯云流计算从一个诡异的 Bug 来看 Flink 快照和状态读取的流程
原创

从一个诡异的 Bug 来看 Flink 快照和状态读取的流程

问题概要

Oceanus 流计算平台支持以 SQL 的方式提交作业,独享集群支持最新的 Flink 1.10 提供的新版 Blink Planner 语法。有一位客户写了一段代码,用到了 SQL 的 TopN 功能,语句类似于:

INSERT INTO `MySink` 
SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

作业提交后,程序运行时一切正常;但是一旦把作业暂停(做快照),然后恢复时,就会持续报错:

java.lang.RuntimeException: Error while getting state
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 13 more

从报错信息可以看出,作业的崩溃是由于 State Serializer 不兼容导致的。那么问题来了,跑的好好的程序,用的都是官方提供的 API,没有任何自定义的代码,为什么会不兼容呢?

报错初探

既然看到了报错时的线程栈,定位问题就可以以此入手。我们首先看下报错的直接根源:updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)

updateRestoredStateMetaInfo 方法的兼容性判断逻辑

从代码里了解到,是 updateStateSerializer() 方法返回了“不兼容”的结果,导致报错的。我们继续按图索骥,追踪到了 StateSerializerProvider 类的 registerNewSerializerForRestoredState() 方法,并确认了不兼容的信号来源于 previousSerializerSnapshot 对象的 resolveSchemaCompatibility() 方法返回的结果。

registerNewSerializerForRestoredState 方法中兼容性的判断逻辑

追踪到这里,我们发现 previousSerializerSnapshot 是一个接口的引用,下面有几十种不同实现。怎么办呢?我们在测试环境复现,并进行远程调试来查看运行时到底发生了什么。

运行时调试

Java 的远程调试方法很简单,只需要在 java 命令的启动参数上加入

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=[调试端口]

的参数即可。

对于 Flink 而言,可以修改 flink-conf.yaml 里面的 env.java.opts.taskmanager 和 env.java.opts.jobmanager 两个配置项,分别对应着 TaskManager 和 JobManager 的运行参数。

因为报错发生在任务运行时,我们首先对 TaskManager 进行调试。果不其然,作业快照并恢复后,又开始报错了。我们也通过对上述代码设置断点,找到了现场:

远程调试 查看 resolveSchemaCompatibility 的具体类

原来是出问题的 previousSerializerSnapshot 对象的类型是 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot 这个类。我们仔细阅读它的 resolveSchemaCompatibility() 方法,可以看到它的核心逻辑,是通过使用 equals 方法,对比它自己的 comparator 和 传入的 newSortedMapSerializer 的 comparator,如果 equals 方法返回 true 就是兼容,反之就不一定兼容(也可能通过 state migration 就兼容了,此文不涉及)。

初步找出问题原因

同时在运行时,我们看到这里的 Comparator 对于 TopN 查询来说,实际上是 RetractableTopNFunction$ComparatorWrapper 内部类。

远程调试找出 Comparator 的具体类

那么问题就清楚了,大概率是这个类的 equals 方法实现不规范(测试不充分),导致出现了此问题。

问题确认

我们看下 ComparatorWrapper 类的 equals 方法的实现:

ComparatorWrapper 类的 equals 方法的实现

从代码里可以看到,它是对比本方法和传入对象的类名、生成的代码、References(附加参数)来判断两个 Comparator 是不是相等(兼容)。

但是,从调试中我们可以看到,类名、生成的代码其实有微小差别(后缀数字不一样)。

调试中发现的对比逻辑 Bug

这里的后缀其实没有特别含义,是 Flink 在生成 Java 代码时,为了避免类、变量冲突而维护的一个自增变量,只与生成顺序有关,与代码逻辑无关。

因此问题就很清楚了:Flink 在判断 TopN 状态的序列化器是否兼容的时候,采用了不合适的对比方法,造成逻辑相同但是生成顺序略有差异的两个 Comparator 被误判为不等(不兼容)。

那么问题解决方法很简单:将这两个逻辑相同的类实例判断为相同即可。

可是问题来了:这两个类实例各自是如何来的呢?又是做什么的呢?为什么需要判断它们两个是否相同呢?

Flink 快照分析

为了回答这些问题,我们先来梳理一下 Flink 快照的流程:

1. 算子收到 Checkpoint Barrier(非数据源算子从上游算子传递得到,数据源算子则是被 Checkpoint Coordinator 推送),处理并准备进行快照(org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#processBarrier 方法)。

2. 通过 org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#notifyCheckpoint 方法,对收到的 Barrier 进行提取元数据和统计,然后触发 Checkpoint 快照(org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#triggerCheckpointOnBarrier 方法)。

3. 调用 org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint 方法,对该算子的状态进行完整快照。它还需要调用 checkpointState、checkpointStreamOperator、snapshotState、snapshot、doSnapshot、computeSnapshot 等方法,对状态进行快照。

4. 在 computeSnapshot 方法里,会对 namespaceSerializer 和 stateSerializer 调用 snapshotConfiguration 方法,对当前的配置做快照,如下图:

computeSnapshot 方法里保存 Serializer 的快照

5. 之后对于这个问题,它调用的是前面介绍过的 SortedMapSerializerSnapshot 类的 writeSnapshot 方法进行快照,本质是将 comparator 序列化并写入快照文件里。

序列化和反序列化 comparator

从 readSnapshot 方法也可以看到,前面介绍过的 SortedMapSerializerSnapshot 类的 comparator 对象就是反序列化状态文件得到的,而这个 comparator 对象就被用作前述 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot#resolveSchemaCompatibility 方法的 equals 左边的 comparator(下图红框)。

问题根源基本确认

那前面的疑问就解决了一半。下面我们继续来查看右边那个 comparator 是怎么来的,又是做什么的。

SQL 作业提交流程

为了回答右边的 comparator 是怎么来的,我们需要看下客户端(Client)的 Flink SQL 作业的提交流程:

1. 当用户写了一个 “INSERT INTO” 语句后,需要调用 org.apache.flink.table.api.TableEnvironment#sqlUpdate 方法,令 Flink 将其解析为语法树(org.apache.flink.table.planner.StreamPlanner#translate)。

2. 在解析时,还会调用 ExecNode#translateToPlan、StreamExecCalc#translateToPlanInternal 等一系列方法,进一步解析语法树的每个节点,将其翻译成逻辑计划。

3. 在生成逻辑计划时,还涉及到一个名为“代码生成”的步骤,即将 SQL -> 语法树时,要把逻辑用 Java 代码表达出来,然后通过内置的 Janino 轻量级编译器,在内存中编译为 class 实例并序列化以作为计划的一部分。在代码生成过程中,类名和变量名都是自增生成的,这也是为什么之前我们截图里两个 comparator 的逻辑一样,类名和代码中的变量名类似但不一致的原因。

4. 对于这个场景,Comparator 的代码是 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 生成的,生成以后就会返回一个 GeneratedRecordComparator 对象。

5. GeneratedRecordComparator 对象会被传入 RetractableTopNFunction 的构造方法,并被前述的 ComparatorWrapper 进行包装,以支持被序列化。

RetractableTopNFunction 的构造方法里需要传入生成的 GeneratedRecordComparator 对象

此时,Flink 作业运行图生成就告一段落。当运行图提交到 Flink 集群进行运行时,RetractableTopNFunction 类的 open 方法中会对状态进行初始化,其中 ValueStateDescriptor 就是访问状态的“钥匙”,它的构造需要传入上述生成的 GeneratedRecordComparator 对象:

RetractableTopNFunction 的运行时初始化方法里要用到生成的 Comparator

报错回溯

让我们再返回文章开头的报错信息,一切明了:

1. 当恢复后的新 Flink 作业希望读取状态时,通过 getState 方法尝试从这个 ValueStateDescriptor 获取状态。

2. 首次访问时,由于这里用了延迟初始化(Lazy Initialization)机制,会检查这个 ValueStateDescriptor 里面封装的 comparator(新作业代码生成的)与快照恢复时里面记录的 comparator 是否兼容,如果兼容才会进行状态读取。

3. 但是很不巧,由于 equals 方法写的有问题,导致对比两个 comparator 时,因为生成的类名不一样,代码里变量也不一样,直接返回了 false,让 Flink 误认为不兼容,所以拒绝继续,作业报错。

解决思路

既然我们知道了问题的根本原因,可以从多方面入手解决这个问题:

1. 最简单的方法是通过正则的方式,把随机变量名去掉,然后对比类名和代码里剩下的字符串。但是这种方法不稳定,跨版本时如果 Generator 的代码格式稍有修改,就会出问题。

2. 更好的方法,是对比代码生成时,传入的元数据(即 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 方法的参数)。如果不需要考虑历史作业的兼容性,可以直接修改 GeneratedRecordComparator 类,加入这些元数据,并在后续的 equals 对比时直接对比这些元数据即可。

另外,这个问题已经反馈给社区并记录为 JIRA 单。由于社区代码提交需要做完整的考量,这里我们还在讨论阶段。初步想法是采用方法 2,但是对于无法处理的历史类,则回退到类似方法 1 的途径。

总而言之,这个问题解决起来不难,但是需要充分注意兼容性和正确性,以避免社区前段时间的一个小修改造成的不兼容的问题

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 类型和序列化机制简介

    使用 Flink 编写处理逻辑时,新手总是容易被林林总总的概念所混淆,本文将逐步解密 Flink 的类型和序列化机制。

    KyleMeow
  • 如何应对飞速增长的状态?Flink State TTL 概述

    在流计算作业中,经常会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句...

    KyleMeow
  • Spillable StateBackend 之 SpillAndLoadManager 源码注解

    在前文中,我们介绍了 Spillable Backend 及其 HeapStatusMonitor 的工作原理和不足。今天我们来看一下 Spillable Ba...

    KyleMeow
  • Flink整合Oozie Shell Action提交任务带Kerberos认证

    原文:https://www.cnblogs.com/ljygz/p/11727770.html

    王知无
  • Flink * 转

    stys35
  • flink-connector-kafka 冲突

    我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite...

    stys35
  • dotnet 从入门到放弃的 500 篇文章合集

    博客包括 C#、WPF、UWP、dotnet core 、git 和 VisualStudio 和一些算法,所有博客使用 docx 保存

    林德熙
  • 竹间智能翁嘉颀:人机交互技术探索 | AI 研习社 60 期猿桌会

    AI 科技评论按:随着语音识别 ASR 的进步,对话机器人从简单的指令式的语音助手,进化到关键词交互方式,人们能够使用较为完整的句子来表达意图,机器人从中截取关...

    AI科技评论
  • WinForm基于插件开发实现多项配置存储

    跟着阿笨一起玩NET
  • 使用React hooks处理复杂表单状态数据

    自从React hooks发布以来已经有一段时间了,我很喜欢这个特性。这个hooks把我勾上了!

    前端知否

扫码关注云+社区

领取腾讯云代金券