前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Too old resource version 引起 Flink JobManager 崩溃的问题定位

Too old resource version 引起 Flink JobManager 崩溃的问题定位

原创
作者头像
KyleMeow
修改2021-09-29 20:45:50
2.7K3
修改2021-09-29 20:45:50
举报

问题背景

近期接到客户反馈,某地域的作业不定期的出现 JobManager 崩溃重启的问题。具体现象如下:

JobManager 在正常运行中,没有任何预兆地,突然报too old resource version错误,紧接着容器就自动退出了:

代码语言:javascript
复制
2020-10-17 14:51:36.289 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 37 (type=CHECKPOINT) @ 1602917496275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:51:36.442 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 37 for job 7848c696590291978aa8279b3c09e5d7 (6958828 bytes in 167 ms).
2020-10-17 14:56:36.287 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 38 (type=CHECKPOINT) @ 1602917796275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 14:56:36.434 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 38 for job 7848c696590291978aa8279b3c09e5d7 (6943747 bytes in 159 ms).
2020-10-17 15:01:36.286 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 39 (type=CHECKPOINT) @ 1602918096275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:01:36.461 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 39 for job 7848c696590291978aa8279b3c09e5d7 (6929939 bytes in 185 ms).
2020-10-17 15:06:36.289 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 40 (type=CHECKPOINT) @ 1602918396275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:06:36.434 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 40 for job 7848c696590291978aa8279b3c09e5d7 (6916258 bytes in 159 ms).
2020-10-17 15:11:36.290 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 41 (type=CHECKPOINT) @ 1602918696275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:11:36.434 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 41 for job 7848c696590291978aa8279b3c09e5d7 (6960383 bytes in 159 ms).
2020-10-17 15:16:36.288 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 42 (type=CHECKPOINT) @ 1602918996275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:16:36.469 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 42 for job 7848c696590291978aa8279b3c09e5d7 (6969143 bytes in 194 ms).
2020-10-17 15:21:36.290 [Checkpoint Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 43 (type=CHECKPOINT) @ 1602919296275 for job 7848c696590291978aa8279b3c09e5d7.
2020-10-17 15:21:36.438 [jobmanager-future-thread-1] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 43 for job 7848c696590291978aa8279b3c09e5d7 (6982908 bytes in 162 ms).
2020-10-17 15:25:20.649 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.kubernetes.KubernetesResourceManager [] - Fatal error occurred in ResourceManager.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.650 [OkHttp https://9.166.252.1/...] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
io.fabric8.kubernetes.client.KubernetesClientException: too old resource version: 5906722735 (5907278804)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:323) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.11-1.11.0.jar:1.11.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
2020-10-17 15:25:20.661 [BlobServer shutdown hook] INFO  org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124

而查看所有 TaskManager 的日志,也没有发现任何异样。

该问题会触发 ResourceManager 对 JobManager 的重新初始化过程,作业也会从最近的一次 Checkpoint 恢复。但是如果没有配置 HA(High Availability,高可用)时,Flink 就无法正常恢复作业,造成运行中关键状态的丢失,这对线上业务是无法接受的。

原因排查

由于日志中关键的内容不多,我们决定先从 too old resource version 这个 Kubernetes 客户端的报错入手,分析异常发生的原因。

由于 Flink 的 Kubernetes 客户端使用 Fabric8,我们查到了其团队成员针对此问题的回复,简单概括如下:

每个 Kubernetes 资源都有自己的版本号,当客户端对 Pods 进行 watch(监听)操作时,有概率会出现410 Gone的 HTTP 状态码。这个状态表代表客户端记录的资源版本号(resourceVersion)太低,服务端不再接受它的请求。此时对于普通 watch 而言,需要自行处理该场景,客户端并没有对此做处理

而 Flink 并没有妥善处理这种场景,而是粗暴地令 JobManager 关闭(随后会重新启动一个新的实例)来应对任何 KubernetesClientException 异常(详见 FLINK-15836)。

Flink KubernetedsPodsWatcher 对客户端异常的处理逻辑
Flink KubernetedsPodsWatcher 对客户端异常的处理逻辑

对于这个设计而言,固然有其合理成分,即遇到异常时 Fail Fast(尽快暴露问题),不至于小问题越拖越大。

但是我们认为,对于这种资源版本不够新的问题,并不属于故障,因此也不需要重启 JobManager 这么重的操作,只需要重新初始化一次 watcher,令其资源版本更新到最新即可。

毕竟,这种可恢复的异常,可能会在一个长期运行作业的运行周期内多次出现,平台方需要考虑到细粒度的容错,令客户的作业能够长期平稳运行。

不过至此我们仍然好奇,究竟是什么原因造成的客户端与服务端的资源版本不一致呢?

触发条件

对于这个 Too old resource version(HTTP Gone 410)的问题,我们已经知道了它的含义和潜在的应对策略,但是仍然需要找到触发条件以便对修复方案进行验证。对此我们尝试了不少方案,例如主动令 JobManager 的 JVM 较长时间停顿等等,但是难以触发同样的现象。

后来我们偶然间发现,重启 API Server 服务可以复现该问题,因为新启动的 API Server 会从 etcd 中获取当前最新 resourceVersion,如果客户端后续用保存的旧值请求的话,该现象就可以得到稳定复现,这给我们的修复和验证工作提供了极大的便利。

解决方案

有了上面的分析,可以在上述提到的onClose方法中,对异常cause的类型进行判定:如果它的错误码code等于 410(HTTP Gone),则令 KubernetesResourceManager 走重新初始化 watch 的流程:

代码语言:javascript
复制
	/**
	 * When “Fatal error occurred in ResourceManager.
	 * 		io.fabric8.kubernetes.client.KubernetesClientException:
	 * 		too old resource version”
	 * happens, re-watch the pods
	 */
	public void reWatchPods() {
		podsWatch = kubeClient.watchPodsAndDoCallback(
			KubernetesUtils.getTaskManagerLabels(clusterId),
			this);
	}

这个方法会令 Flink 的kubeClient从服务器中获取当前最新的 pods 信息,然后重新注册 watch 回调,开启新的一轮监听。

容错效果

应用了上述修复方案后,重新制作 Flink 镜像并进行验证,可以看到 Too old resource version 问题得到复现,异常也被捕获并重新进行了 pod watcher 的初始化,JobManager 正常运行,没有发生崩溃等现象:

Too old resource version 异常被捕获并重新进行 watch 操作
Too old resource version 异常被捕获并重新进行 watch 操作

同时对该作业进行多次重启 API Server 操作,均可正常应对,Checkpoint 和 Savepoint 也可以继续进行。我们还在作业运行期间模拟单个和多个 TaskManager Pod 崩溃的场景,也可以正常地重新分配新的 Pod 并自动恢复作业,说明 Kubernetes Client 与服务端的后续通信都是正常的。

思考总结

回顾该问题的定位过程,其实不算特别顺利,大多数时间用在了思考和尝试如何复现问题方面。找到了复现方案,就可以把一个偶现的问题变成必现的问题,从而可以细致地从整个调用链分析问题的成因、后果、解决方案等方方面面。

另外此问题也显示了 Flink 的 Kubernetes 模块远非完美,仍然需要大家积极的发现、定位并解决各种运行时问题,为社区的发展贡献自己的力量。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题背景
  • 原因排查
  • 触发条件
  • 解决方案
  • 容错效果
  • 思考总结
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档