专栏首页腾讯云流计算 Oceanus从一个诡异的 Bug 来看 Flink 快照和状态读取的流程
原创

从一个诡异的 Bug 来看 Flink 快照和状态读取的流程

问题概要

Oceanus 流计算平台支持以 SQL 的方式提交作业,独享集群支持最新的 Flink 1.10 提供的新版 Blink Planner 语法。有一位客户写了一段代码,用到了 SQL 的 TopN 功能,语句类似于:

INSERT INTO `MySink` 
SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

作业提交后,程序运行时一切正常;但是一旦把作业暂停(做快照),然后恢复时,就会持续报错:

java.lang.RuntimeException: Error while getting state
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
        at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
        at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
        at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 13 more

从报错信息可以看出,作业的崩溃是由于 State Serializer 不兼容导致的。那么问题来了,跑的好好的程序,用的都是官方提供的 API,没有任何自定义的代码,为什么会不兼容呢?

报错初探

既然看到了报错时的线程栈,定位问题就可以以此入手。我们首先看下报错的直接根源:updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)

updateRestoredStateMetaInfo 方法的兼容性判断逻辑

从代码里了解到,是 updateStateSerializer() 方法返回了“不兼容”的结果,导致报错的。我们继续按图索骥,追踪到了 StateSerializerProvider 类的 registerNewSerializerForRestoredState() 方法,并确认了不兼容的信号来源于 previousSerializerSnapshot 对象的 resolveSchemaCompatibility() 方法返回的结果。

registerNewSerializerForRestoredState 方法中兼容性的判断逻辑

追踪到这里,我们发现 previousSerializerSnapshot 是一个接口的引用,下面有几十种不同实现。怎么办呢?我们在测试环境复现,并进行远程调试来查看运行时到底发生了什么。

运行时调试

Java 的远程调试方法很简单,只需要在 java 命令的启动参数上加入

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=[调试端口]

的参数即可。

对于 Flink 而言,可以修改 flink-conf.yaml 里面的 env.java.opts.taskmanager 和 env.java.opts.jobmanager 两个配置项,分别对应着 TaskManager 和 JobManager 的运行参数。

因为报错发生在任务运行时,我们首先对 TaskManager 进行调试。果不其然,作业快照并恢复后,又开始报错了。我们也通过对上述代码设置断点,找到了现场:

远程调试 查看 resolveSchemaCompatibility 的具体类

原来是出问题的 previousSerializerSnapshot 对象的类型是 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot 这个类。我们仔细阅读它的 resolveSchemaCompatibility() 方法,可以看到它的核心逻辑,是通过使用 equals 方法,对比它自己的 comparator 和 传入的 newSortedMapSerializer 的 comparator,如果 equals 方法返回 true 就是兼容,反之就不一定兼容(也可能通过 state migration 就兼容了,此文不涉及)。

初步找出问题原因

同时在运行时,我们看到这里的 Comparator 对于 TopN 查询来说,实际上是 RetractableTopNFunction$ComparatorWrapper 内部类。

远程调试找出 Comparator 的具体类

那么问题就清楚了,大概率是这个类的 equals 方法实现不规范(测试不充分),导致出现了此问题。

问题确认

我们看下 ComparatorWrapper 类的 equals 方法的实现:

ComparatorWrapper 类的 equals 方法的实现

从代码里可以看到,它是对比本方法和传入对象的类名、生成的代码、References(附加参数)来判断两个 Comparator 是不是相等(兼容)。

但是,从调试中我们可以看到,类名、生成的代码其实有微小差别(后缀数字不一样)。

调试中发现的对比逻辑 Bug

这里的后缀其实没有特别含义,是 Flink 在生成 Java 代码时,为了避免类、变量冲突而维护的一个自增变量,只与生成顺序有关,与代码逻辑无关。

因此问题就很清楚了:Flink 在判断 TopN 状态的序列化器是否兼容的时候,采用了不合适的对比方法,造成逻辑相同但是生成顺序略有差异的两个 Comparator 被误判为不等(不兼容)。

那么问题解决方法很简单:将这两个逻辑相同的类实例判断为相同即可。

可是问题来了:这两个类实例各自是如何来的呢?又是做什么的呢?为什么需要判断它们两个是否相同呢?

Flink 快照分析

为了回答这些问题,我们先来梳理一下 Flink 快照的流程:

1. 算子收到 Checkpoint Barrier(非数据源算子从上游算子传递得到,数据源算子则是被 Checkpoint Coordinator 推送),处理并准备进行快照(org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#processBarrier 方法)。

2. 通过 org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler#notifyCheckpoint 方法,对收到的 Barrier 进行提取元数据和统计,然后触发 Checkpoint 快照(org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable#triggerCheckpointOnBarrier 方法)。

3. 调用 org.apache.flink.streaming.runtime.tasks.StreamTask#performCheckpoint 方法,对该算子的状态进行完整快照。它还需要调用 checkpointState、checkpointStreamOperator、snapshotState、snapshot、doSnapshot、computeSnapshot 等方法,对状态进行快照。

4. 在 computeSnapshot 方法里,会对 namespaceSerializer 和 stateSerializer 调用 snapshotConfiguration 方法,对当前的配置做快照,如下图:

computeSnapshot 方法里保存 Serializer 的快照

5. 之后对于这个问题,它调用的是前面介绍过的 SortedMapSerializerSnapshot 类的 writeSnapshot 方法进行快照,本质是将 comparator 序列化并写入快照文件里。

序列化和反序列化 comparator

从 readSnapshot 方法也可以看到,前面介绍过的 SortedMapSerializerSnapshot 类的 comparator 对象就是反序列化状态文件得到的,而这个 comparator 对象就被用作前述 org.apache.flink.table.runtime.typeutils.SortedMapSerializerSnapshot#resolveSchemaCompatibility 方法的 equals 左边的 comparator(下图红框)。

问题根源基本确认

那前面的疑问就解决了一半。下面我们继续来查看右边那个 comparator 是怎么来的,又是做什么的。

SQL 作业提交流程

为了回答右边的 comparator 是怎么来的,我们需要看下客户端(Client)的 Flink SQL 作业的提交流程:

1. 当用户写了一个 “INSERT INTO” 语句后,需要调用 org.apache.flink.table.api.TableEnvironment#sqlUpdate 方法,令 Flink 将其解析为语法树(org.apache.flink.table.planner.StreamPlanner#translate)。

2. 在解析时,还会调用 ExecNode#translateToPlan、StreamExecCalc#translateToPlanInternal 等一系列方法,进一步解析语法树的每个节点,将其翻译成逻辑计划。

3. 在生成逻辑计划时,还涉及到一个名为“代码生成”的步骤,即将 SQL -> 语法树时,要把逻辑用 Java 代码表达出来,然后通过内置的 Janino 轻量级编译器,在内存中编译为 class 实例并序列化以作为计划的一部分。在代码生成过程中,类名和变量名都是自增生成的,这也是为什么之前我们截图里两个 comparator 的逻辑一样,类名和代码中的变量名类似但不一致的原因。

4. 对于这个场景,Comparator 的代码是 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 生成的,生成以后就会返回一个 GeneratedRecordComparator 对象。

5. GeneratedRecordComparator 对象会被传入 RetractableTopNFunction 的构造方法,并被前述的 ComparatorWrapper 进行包装,以支持被序列化。

RetractableTopNFunction 的构造方法里需要传入生成的 GeneratedRecordComparator 对象

此时,Flink 作业运行图生成就告一段落。当运行图提交到 Flink 集群进行运行时,RetractableTopNFunction 类的 open 方法中会对状态进行初始化,其中 ValueStateDescriptor 就是访问状态的“钥匙”,它的构造需要传入上述生成的 GeneratedRecordComparator 对象:

RetractableTopNFunction 的运行时初始化方法里要用到生成的 Comparator

报错回溯

让我们再返回文章开头的报错信息,一切明了:

1. 当恢复后的新 Flink 作业希望读取状态时,通过 getState 方法尝试从这个 ValueStateDescriptor 获取状态。

2. 首次访问时,由于这里用了延迟初始化(Lazy Initialization)机制,会检查这个 ValueStateDescriptor 里面封装的 comparator(新作业代码生成的)与快照恢复时里面记录的 comparator 是否兼容,如果兼容才会进行状态读取。

3. 但是很不巧,由于 equals 方法写的有问题,导致对比两个 comparator 时,因为生成的类名不一样,代码里变量也不一样,直接返回了 false,让 Flink 误认为不兼容,所以拒绝继续,作业报错。

解决思路

既然我们知道了问题的根本原因,可以从多方面入手解决这个问题:

1. 最简单的方法是通过正则的方式,把随机变量名去掉,然后对比类名和代码里剩下的字符串。但是这种方法不稳定,跨版本时如果 Generator 的代码格式稍有修改,就会出问题。

2. 更好的方法,是对比代码生成时,传入的元数据(即 org.apache.flink.table.planner.codegen.sort.ComparatorCodeGenerator#gen 方法的参数)。如果不需要考虑历史作业的兼容性,可以直接修改 GeneratedRecordComparator 类,加入这些元数据,并在后续的 equals 对比时直接对比这些元数据即可。

另外,这个问题已经反馈给社区并记录为 JIRA 单。由于社区代码提交需要做完整的考量,这里我们还在讨论阶段。初步想法是采用方法 2,但是对于无法处理的历史类,则回退到类似方法 1 的途径。

总而言之,这个问题解决起来不难,但是需要充分注意兼容性和正确性,以避免社区前段时间的一个小修改造成的不兼容的问题

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink Savepoints和Checkpoints的3个不同点

    在本文中,我们将解释什么是 Savepoint,什么会使用它们,并就它们与 Checkpoint 的区别进行对比分析。

    smartsi
  • Flink如何实现新的流处理应用第二部分:版本化状态

    这是我们关于 Flink 如何实现新的流处理应用系列中的第二篇博文。第一部分介绍了事件时间和乱序处理。

    smartsi
  • Flink DataStream—— 状态(State)&检查点(Checkpoint)&保存点(Savepoint)原理

    ​ 最近一次项目当中需要将大量数据保存再Flink程序当中用作缓存数据一共后续数据使用,隧对最近使用到的状态、检查点、保存点等原理和使用进行一个总结

    俺也想起舞
  • Flink流计算编程--Flink扩容、程序升级前后的思考

    对于持续生成新数据的场景,采用流计算显然是有利的。数据源源不断的产生,流计算系统理论上就要不间断的提供数据计算(可以停机维护的场景不在本文的讨论范围)。那么假如...

    zhisheng
  • Apache Flink 零基础入门(一):基础概念解析

    Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计...

    Java帮帮
  • Flink 常见问题定位指南

    流计算作业通常运行时间长,数据吞吐量大,且对时延较为敏感。但实际运行中,Flink 作业可能因为各种原因出现吞吐量抖动、延迟高、快照失败等突发情况,甚至发生崩溃...

    腾讯云大数据
  • Flink 1.11 新特性详解:【非对齐】Unaligned Checkpoint 优化高反压

    问题导读 1.Barrier 对齐会造成什么问题? 2.Barrier 对齐是否会造成反压? 3.如何理解Unaligned Checkpoint ? 作为...

    zhisheng
  • flink超越Spark的Checkpoint机制

    同时,浪尖也在知识星球里发了源码解析的文章。spark streaming的Checkpoint仅仅是针对driver的故障恢复做了数据和元数据的Checkpo...

    Spark学习技巧
  • Flink Checkpoint机制原理剖析与参数配置

    在Flink状态管理详解这篇文章中,我们介绍了Flink的状态都是基于本地的,而Flink又是一个部署在多节点的分布式引擎,分布式系统经常出现进程被杀、节点宕机...

    PP鲁
  • Flink 常见问题定位指南

    流计算作业通常运行时间长,数据吞吐量大,且对时延较为敏感。但实际运行中,Flink 作业可能因为各种原因出现吞吐量抖动、延迟高、快照失败等突发情况,甚至发生崩溃...

    KyleMeow
  • Flink 内部原理之数据流容错

    Apache Flink提供了一个容错机制来持续恢复数据流应用程序的状态。该机制确保即使在出现故障的情况下,程序的状态也将最终反映每条记录来自数据流严格一次ex...

    smartsi
  • 我们在学习Flink的时候,到底在学习什么?

    后台很多小伙伴都在问Flink的学习路径,那么我们在学习Flink的时候,到底重点学习哪些东西呢?

    王知无-import_bigdata
  • Flink CDC 原理、实践和优化

    CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)...

    腾讯云大数据
  • Flink CDC 原理、实践和优化

    CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)...

    KyleMeow
  • Storm VS Flink ——性能对比

    Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架。其中 Apache Storm(以下简称“Storm”)在美...

    实时计算
  • Storm VS Flink ——性能对比

    Apache Flink 和 Apache Storm 是当前业界广泛使用的两个分布式实时计算框架。其中 Apache Storm(以下简称“Storm”)在美...

    用户6070864
  • Streaming with Apache Training

    本次培训主要专注在四个重要的概念:连续处理流数据,事件时间,有状态的流处理和状态快照。

    未来还未来
  • Flink 快照分析:定位大状态和数据倾斜的算子

    在 Flink 作业中,无论是 SQL 还是 JAR 模式,常常会直接或者间接地使用到状态(State)。当 Flink 进行快照时,用户定义的这些状态数据可...

    腾讯QQ大数据
  • Flink状态的缩放(rescale)与键组(Key Group)设计

    在之前那篇讲解Flink Timer的文章里,我曾经用三言两语简单解释了Key Group和KeyGroupRange的概念。实际上,Key Group是Fli...

    zhisheng

扫码关注云+社区

领取腾讯云代金券