首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Google pubsub async java API干净关闭

Google pubsub async java API干净关闭
EN

Stack Overflow用户
提问于 2018-11-07 22:53:40
回答 1查看 440关注 0票数 1

我无法让Google pubsub Java异步客户端干净利落地关闭。在调用Subscriber.stopAsync()之后,我得到的异常如下所示

代码语言:javascript
运行
复制
    14:30:07.600 [grpc-default-worker-ELG-1-2] WARN  io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise - An exception was thrown by io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete()
          java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@724c721d rejected from java.util.concurrent.ScheduledThreadPoolExecutor@36bdb610[Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 19]
          at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[?:1.8.0_144]
          at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[?:1.8.0_144]
          at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) ~[?:1.8.0_144]
          at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) ~[?:1.8.0_144]
          at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) ~[?:1.8.0_144]
          at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallImpl.java:611) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.ForwardingClientStreamListener.onReady(ForwardingClientStreamListener.java:49) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.AbstractStream$TransportState.notifyIfReady(AbstractStream.java:298) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.AbstractStream$TransportState.onStreamAllocated(AbstractStream.java:237) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:249) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:521) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:509) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.tryPromise(Http2CodecUtil.java:378) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:344) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:256) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:52) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:696) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:409) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1396) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:1776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:775) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.flush(SslHandler.java:752) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.gracefulClose(NettyClientHandler.java:631) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:300) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1061) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.write(AbstractChannel.java:295) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$AbstractQueuedCommand.run(WriteQueue.java:174) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:112) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

我认为这是由消费者试图确认他们处理的消息间接造成的。

我的期望是,在调用stopAsync()之后,不再从服务器中提取消息,而是将客户端上缓冲的消息传递给我的回调函数,并且我能够对这些消息以及我正在处理的任何消息进行ack或nak,但我似乎无法实现这一点。

我在订阅上看不到任何其他我可以调用的方法来实现正常关闭,我是不是遗漏了什么?

显然,这些消息最终将被重新传递,但我更喜欢在关闭之前处理缓冲区中的消息,并且我更愿意避免日志中的“正常”异常。

EN

回答 1

Stack Overflow用户

发布于 2018-11-08 01:25:28

stopAsync()的工作方式是运行关机“序列”,但会立即返回。可能会留下一些工作在幕后运行。尝试调用subscriber.stopAsync().awaitTerminated(),让程序等待,直到服务达到“终止”状态。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53191932

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档