我有一个Spring应用程序(v2.2.10.RELEASE),它订阅pubSub中的多个主题,并提取异步数据并将其发送到其他地方。我没有使用SpringGCP,只是使用本地google库。
这是我的订阅服务器设置:
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
messages.add(message);
consumer.ack();
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setParallelPullCount(2)
.setFlowControlSettings(flowControlSettings)
.setCredentialsProvider(credentialsProvider)
.setExecutorProvider(executorProvider)
//.setChannelProvider()
.build();
对于高流量和大消息(2-4kb),我会遇到以下信息:
[grpc-default-worker-ELG-1-1] INFO i.grpc.internal.AbstractClientStream - Received data on closed stream
首先,我不完全明白那是什么意思?我注意到的是,当发生这种情况时,传递的重复消息会增加。因此,我认为这意味着pubSub试图使用一些消息到达订阅服务器,但是由于某些原因,订户还没有准备好,所以pubSub将再次尝试传递消息。所以会有更多的重复,对吗?
在订户中使用TransportChannelProvider
会解决这个问题吗?我对写得不好的文档的理解是,当当前正在使用的通道关闭时,这将创建一个新的传递通道,从而消除先前的日志消息。
如果是,如何定义通道目标字符串?在哪里可以为mangagedChannel找到符合NameResolver的URI。我的意思是这个片段:
private TransportChannelProvider getChannelProvider() {
ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext(true).build();
return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
}
我是GCP的新手,如果我的问题不够连贯,我很抱歉
发布于 2021-07-16 18:59:26
使用自定义TransportChannelProvider
无法解决此类问题。这更可能是堆栈中的一个更深层次的问题,例如在gRPC级别。对于这类错误[1](https://github.com/googleapis/google-cloud-java/issues/2668),[2],存在一些悬而未决的问题。
关于造成重复的原因,有可能是通过已经关闭的流(与错误消息对齐)传递消息,因为它们被困在gRPC层的较低级别缓冲区中,因此最终成为随后通过另一个流传递和处理的消息的副本。这可能是大量积压的小消息文档中讨论的问题的一个版本。Java客户端库的v1.109.0中对此问题进行了修正,因此,如果您使用的版本比此更早,则值得更新。
如果重复仍然是一个问题,最好是使用订阅的名称和一些重复消息的消息it来伸出援助之手,以便它们能够查看这些消息的传递模式,并进一步诊断这些重发是否是意外的。
https://stackoverflow.com/questions/68410973
复制相似问题