专栏首页腾讯云流计算 OceanusToo old resource version 引起 Flink JobManager 崩溃的问题定位
原创

Too old resource version 引起 Flink JobManager 崩溃的问题定位

问题背景

近期接到客户反馈,某地域的作业不定期的出现 JobManager 崩溃重启的问题。具体现象如下:

JobManager 在正常运行中,没有任何预兆地,突然报too old resource version错误,紧接着容器就自动退出了:

2020-10-17 14:51:36.289 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 37 (type=CHECKPOINT) @ 1602917496275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:51:36.442 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 37 for job 7848c696590291978aa8279b3c09e5d7 (6958828 bytes in 167 ms).
2020-10-17 14:56:36.287 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 38 (type=CHECKPOINT) @ 1602917796275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:56:36.434 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 38 for job 7848c696590291978aa8279b3c09e5d7 (6943747 bytes in 159 ms).
2020-10-17 15:01:36.286 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39 (type=CHECKPOINT) @ 1602918096275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:01:36.461 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39 for job 7848c696590291978aa8279b3c09e5d7 (6929939 bytes in 185 ms).
2020-10-17 15:06:36.289 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 40 (type=CHECKPOINT) @ 1602918396275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:06:36.434 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 40 for job 7848c696590291978aa8279b3c09e5d7 (6916258 bytes in 159 ms).
2020-10-17 15:11:36.290 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 41 (type=CHECKPOINT) @ 1602918696275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:11:36.434 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 41 for job 7848c696590291978aa8279b3c09e5d7 (6960383 bytes in 159 ms).
2020-10-17 15:16:36.288 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 42 (type=CHECKPOINT) @ 1602918996275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:16:36.469 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 42 for job 7848c696590291978aa8279b3c09e5d7 (6969143 bytes in 194 ms).
2020-10-17 15:21:36.290 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 43 (type=CHECKPOINT) @ 1602919296275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:21:36.438 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 43 for job 7848c696590291978aa8279b3c09e5d7 (6982908 bytes in 162 ms).
2020-10-17 15:25:20.649 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.kubernetes.KubernetesResourceManager [] - Fatal error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.650 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.661 [BlobServer shutdown hook] INFO  org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124

而查看所有 TaskManager 的日志,也没有发现任何异样。

该问题会触发 ResourceManager 对 JobManager 的重新初始化过程,作业也会从最近的一次 Checkpoint 恢复。但是如果没有配置 HA(High Availability,高可用)时,Flink 就无法正常恢复作业,造成运行中关键状态的丢失,这对线上业务是无法接受的。

原因排查

由于日志中关键的内容不多,我们决定先从 too old resource version 这个 Kubernetes 客户端的报错入手,分析异常发生的原因。

由于 Flink 的 Kubernetes 客户端使用 Fabric8,我们查到了其团队成员针对此问题的回复,简单概括如下:

每个 Kubernetes 资源都有自己的版本号,当客户端对 Pods 进行 watch(监听)操作时,有概率会出现410 Gone的 HTTP 状态码。这个状态表代表客户端记录的资源版本号(resourceVersion)太低,服务端不再接受它的请求。此时对于普通 watch 而言,需要自行处理该场景,客户端并没有对此做处理

而 Flink 并没有妥善处理这种场景,而是粗暴地令 JobManager 关闭(随后会重新启动一个新的实例)来应对任何 KubernetesClientException 异常(详见 FLINK-15836)。

Flink KubernetedsPodsWatcher 对客户端异常的处理逻辑

对于这个设计而言,固然有其合理成分,即遇到异常时 Fail Fast(尽快暴露问题),不至于小问题越拖越大。

但是我们认为,对于这种资源版本不够新的问题,并不属于故障,因此也不需要重启 JobManager 这么重的操作,只需要重新初始化一次 watcher,令其资源版本更新到最新即可。

毕竟,这种可恢复的异常,可能会在一个长期运行作业的运行周期内多次出现,平台方需要考虑到细粒度的容错,令客户的作业能够长期平稳运行。

不过至此我们仍然好奇,究竟是什么原因造成的客户端与服务端的资源版本不一致呢?

触发条件

对于这个 Too old resource version(HTTP Gone 410)的问题,我们已经知道了它的含义和潜在的应对策略,但是仍然需要找到触发条件以便对修复方案进行验证。对此我们尝试了不少方案,例如主动令 JobManager 的 JVM 较长时间停顿等等,但是难以触发同样的现象。

后来我们偶然间发现,重启 API Server 服务可以复现该问题,因为新启动的 API Server 会从 etcd 中获取当前最新 resourceVersion,如果客户端后续用保存的旧值请求的话,该现象就可以得到稳定复现,这给我们的修复和验证工作提供了极大的便利。

解决方案

有了上面的分析,可以在上述提到的onClose方法中,对异常cause的类型进行判定:如果它的错误码code等于 410(HTTP Gone),则令 KubernetesResourceManager 走重新初始化 watch 的流程:

	/**
	 * When “Fatal error occurred in ResourceManager.
	 * 		io.fabric8.kubernetes.client.KubernetesClientException:
	 * 		too old resource version”
	 * happens, re-watch the pods
	 */
	public void reWatchPods() {
		podsWatch = kubeClient.watchPodsAndDoCallback(
			KubernetesUtils.getTaskManagerLabels(clusterId),
			this);
	}

这个方法会令 Flink 的kubeClient从服务器中获取当前最新的 pods 信息,然后重新注册 watch 回调,开启新的一轮监听。

容错效果

应用了上述修复方案后,重新制作 Flink 镜像并进行验证,可以看到 Too old resource version 问题得到复现,异常也被捕获并重新进行了 pod watcher 的初始化,JobManager 正常运行,没有发生崩溃等现象:

Too old resource version 异常被捕获并重新进行 watch 操作

同时对该作业进行多次重启 API Server 操作,均可正常应对,Checkpoint 和 Savepoint 也可以继续进行。我们还在作业运行期间模拟单个和多个 TaskManager Pod 崩溃的场景,也可以正常地重新分配新的 Pod 并自动恢复作业,说明 Kubernetes Client 与服务端的后续通信都是正常的。

思考总结

回顾该问题的定位过程,其实不算特别顺利,大多数时间用在了思考和尝试如何复现问题方面。找到了复现方案,就可以把一个偶现的问题变成必现的问题,从而可以细致地从整个调用链分析问题的成因、后果、解决方案等方方面面。

另外此问题也显示了 Flink 的 Kubernetes 模块远非完美,仍然需要大家积极的发现、定位并解决各种运行时问题,为社区的发展贡献自己的力量。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 《从0到1学习Flink》—— Flink JobManager 高可用性配置

    之前在 《从0到1学习Flink》—— Flink 配置文件详解 讲过 Flink 的配置,但是后面陆续有人来问我一些配置相关的东西,在加上我现在对 Flink...

    zhisheng
  • 干货:Flink+Kafka 0.11端到端精确一次处理语义实现

    实时处理里消息的仅一次处理是大家关注的重点吧,前面浪尖分享过一篇对比spark streaming 和 flink的文章 <Spark Streaming VS...

    Spark学习技巧
  • 穿越迷雾:一次多组件超时的 Flink 崩溃定位小记

    上周四下午,告警系统突然提示某位大客户的作业频繁发生崩溃和重启,现象是作业运行起来 2 分钟左右,JobManager 就发现有 TaskManager 心跳失...

    KyleMeow
  • Flink on Yarn / K8s 原理剖析及实践

    本文根据 Apache Flink 进阶篇系列直播课程整理而成,由阿里巴巴技术专家周凯波(宝牛)分享,主要介绍 Flink on Yarn / K8s 的原理及...

    zhisheng
  • Flink 常见问题定位指南

    流计算作业通常运行时间长,数据吞吐量大,且对时延较为敏感。但实际运行中,Flink 作业可能因为各种原因出现吞吐量抖动、延迟高、快照失败等突发情况,甚至发生崩溃...

    KyleMeow
  • Flink 常见问题定位指南

    流计算作业通常运行时间长,数据吞吐量大,且对时延较为敏感。但实际运行中,Flink 作业可能因为各种原因出现吞吐量抖动、延迟高、快照失败等突发情况,甚至发生崩溃...

    腾讯云大数据
  • Flink核心概念:系统架构、时间处理、状态与检查点

    上图的Flink示例程序对一个数据流做简单处理,整个过程包括了输入(Source)、转换(Transformation)和输出(Sink)。程序由多个DataS...

    PP鲁
  • eclipse svn插件卸载 重新安装 Subclipse卸载安装 The project was not built since its build path is incomplete T

    使用的是eclipse kepler版本,崩溃了,想要重新安装,主要遇到了下面这几种问题

    noteless
  • 如何实时监控 Flink 集群和作业?

    Flink 相关的组件和作业的稳定性通常是比较关键的,所以得需要对它们进行监控,如果有异常,则需要及时告警通知。本章先会教会教会大家如何利用现有 Flink U...

    zhisheng
  • Flink 1.14.0 内存优化你不懂?跟着土哥走就对了(万字长文+参数调优)

    自从写 Flink 系列文章,收到了太多读者的私信,希望我不断更新完善 Flink 专栏,为此,土哥还专门创建了一个文档,用来记录粉丝和读者在使用 Flink ...

    用户6070864
  • 硬核!八张图搞懂 Flink 端到端精准一次处理语义 Exactly-once(深入原理,建议收藏)

    在 Flink 1.4 版本之前,精准一次处理只限于 Flink 应用内,也就是所有的 Operator 完全由 Flink 状态保存并管理的才能实现精确一次处...

    五分钟学大数据
  • Flink JVM 内存超限的分析方法总结

    前段时间,某客户的大作业(并行度 200 左右)遇到了 TaskManager JVM 内存超限(实际内存用量 4.1G > 容器设定的最大阈值 4.0G),被...

    zhisheng
  • Flink JVM 内存超限的分析方法总结

    前段时间,某客户的大作业(并行度 200 左右)遇到了 TaskManager JVM 内存超限(实际内存用量 4.1G > 容器设定的最大阈值 4.0G),被...

    KyleMeow
  • 修复 Flink Kubernetes 资源分配慢 兼谈如何贡献开源社区

    近期我们发现 Kubernetes 环境下的 Flink 集群有个奇怪的现象:在算子并行度较大(例如超过 50)时,Flink 的 TaskManager 注册...

    KyleMeow
  • 分布式计算引擎 Flink/Spark on k8s 的实现对比以及实践

    以 Flink 和 Spark 为代表的分布式流批计算框架的下层资源管理平台逐渐从 Hadoop 生态的 YARN 转向 Kubernetes 生态的 k8s ...

    legendtkl
  • Flink源码阅读(一)--Checkpoint触发机制

      Flink的checkpoint是通过定时器周期性触发的。checkpoint触发最关键的类是CheckpointCoordinator,称它为检查点协调器...

    在周末
  • Flink on YARN模式下TaskManager的内存分配探究

    该作业启动了10个TaskManager,并正常运行。来到该任务的Web界面,随便打开一个TaskManager页面,看看它的内存情况。

    王知无-import_bigdata
  • IntelliJ IDEA 2019.3 这回真的要飞起来了,新特性抢先看!

    IntelliJ IDEA 上周才公布下一个主要版本 2019.3 的 Roadmap,近日就发布了 IntelliJ IDEA 2019.3 的首个早期访问版...

    zhisheng

扫码关注云+社区

领取腾讯云代金券