我有两个关于在Kubernetes上运行的StateFun应用程序的高可用性的问题。
下面是有关我的设置的详细信息:
1-我尝试了动物园管理员和Kubernetes HA设置,结果是一样的(下面的日志来自动物园管理员HA )。当我杀死作业经理吊舱时,minikube启动另一个吊舱,当它试图加载最后一个检查点时,这个新吊舱就失败了:
...
2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Initialized DefaultCompletedCheckpointStore in 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using application-defined options factory: DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to 'filesystem': (checkpoints "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 1 checkpoints in ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 00000000000000000000000000000000 located at hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 00000000000000000000000000000000 failed.
at org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 2edd7b5dafb2c271440b25f6da5f4532
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state 2edd7b5dafb2c271440b25f6da5f4532
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete.
2021-12-11 14:25:27,036 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Closing the slot manager.
2021-12-11 14:25:27,040 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Suspending the slot manager.
2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}
我认为,在使用这里时不能为Flink运算符指定is (正如告诉StateFun的那样)是造成这种情况的原因。当它在一开始很好的工作时,操作符被分配了一些随机的id,并且检查点变得很好。重新启动后,将为操作符分配其他随机it,并且当作业管理器(在本例中为state主版)试图加载状态"2edd7b5dafb2c271440b25f6da5f4532“时,它将无法找到分配给它的操作符。
有人能确认我认为什么是正确的和/或给我如何使我的StateFun应用程序高可用性工作的方向吗?
另一件有趣的事情是,在工作经理舱重新启动几次后,除了上述例外情况,有时还可以通过“从检查点恢复作业000000000000000000000000000000……”。行(?),其中包含“not state to restore”日志(链接) --这让我不确定它是否真的恢复了,或者它刚刚开始在上一个成功的检查点上丢弃状态。是什么导致了这一切?它真的从检查点恢复成功了吗?
2-用于Kubernetes部署,on StateFun部署文档(链接)部署类型用于作业管理器组件。另一方面,Flink部署文档(独立/ Kubernetes部分) (link)使用作业类型作为职务管理器,用于高可用的设置(作业管理器-应用程序-ha.yaml文件)
基本上,由于Kubernetes将在失败时重新启动吊舱,所以可以使用作业或部署。但问题是,当我们尝试使用savepoint停止作业并使用部署类型时,Kubernetes将重新启动pod,而不管保存点的创建是否成功和成功退出状态(0)。
在Kubernetes上运行时,我们是否应该停止使用保存点的StateFun应用程序?我知道相关的bug (link) --尽管它似乎被废弃了,但我可以用保存点取消它--我们是否应该按照高可用性数据清理部分中的说法,删除部署?(link)
针对第一个问题的更新:我打开了调试日志记录,可以在异常情况下捕获会话并连续成功启动。以下是不成功的例子:
...
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' {id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' {id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' {id: 6, parallelism: 1, user function: }
这是一个成功的例子:
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' {id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' {id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated hash '2edd7b5dafb2c271440b25f6da5f4532' for node 'router (my-ingress-2-in)-6' {id: 6, parallelism: 1, user function: }
似乎在两次运行之间生成的散列计算是不同的。
发布于 2021-12-15 16:51:47
在州立<= 3.2路由器没有手动指定的UID。虽然Flinks内部的UID生成是确定性的,但在某些情况下,状态函数生成底层流图的方式可能不是这样的。这是个窃听器。我打开了一个PR来修复向后兼容的way1中的这个问题。
https://stackoverflow.com/questions/70316498
复制相似问题