我查看了rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。如果我要手动ACK/NACK消息,我需要将重试计数保存在内存中(例如,通过使用correlationId作为映射中的唯一键),或者通过在消息中设置我自己的标头并重新传递它(从而将其放在队列的末尾)
然而,这是一个由spring处理的案例。具体来说,我指的是RetryInterceptorBuilder.stateful().maxAttempts(x).但是,这个计数是特定于JVM的,还是它在以某种方式操纵消息?
例如,我有一个web应用程序部署到两个服务器,maxAttempts设置为5。根据两个服务器之间的重新传递和重新处理的顺序,总的重新传递计数是否可能在5-9之间?
发布于 2015-03-13 21:30:47
基于拒绝的重新排队时,Rabbit/AMQP不允许修改消息。
状态(基于messageId)在RetryContextCache中维护;缺省值为MapRetryContextCache。这并不真正适合于“集群”,因为正如您所说的,尝试可能会达到((maxAttempts - 1) * n + 1);此外,它还会导致内存泄漏(在某些服务器上留下状态)。您可以在RetryTemplate (构建器中的RetryOperations)中配置SoftReferenceMapRetryContextCache来避免内存泄漏,但这只能解决内存泄漏问题。
你需要使用一个自定义的RetryContextCache和一些持久化的共享存储(例如redis)。
在这种情况下,我通常建议使用无状态恢复-重试完全在容器中完成,根本不涉及rabbit (直到重试耗尽,在这种情况下,消息将被丢弃或发送到DLX/DLQ,具体取决于代理配置)。
如果您不关心消息顺序(我假设您没有竞争的消费者),一种有趣的技术是拒绝消息,将其发送到具有过期设置的DLQ,当DLQ消息过期时,将其路由回原始队列的尾部(而不是头部)。在这种情况下,可以检查x-death标头以确定它已被重试了多少次。
This answer和this one有更多细节报道。
https://stackoverflow.com/questions/29015793
复制相似问题