我正在尝试理解在RabbitMQ中合并或分块传入消息的最佳方法(直接使用Spring AMQP或Java client )。
换句话说,我想接受100条传入消息,并将它们组合为1,然后以可靠(正确地ACK
ed )的方式将其重新发送到另一个队列。我相信这在企业信息门户中被称为aggregator模式。
我知道Spring Integration provides an aggregator solution,但它的实现看起来并不安全(这就是说,它看起来必须确认并使用消息来构建合并的消息,因此,如果您在这样做的同时关闭它,您会丢失消息吗?)。
发布于 2013-02-26 00:45:19
如果将<amqp-inbound-channel-adapter/>
tx-size
属性设置为100,容器将每100条消息确认一次,因此这应该可以防止消息丢失。
但是,您可能希望使聚合消息的发送(在第100次接收时)成为事务性的,以便您可以在对入站消息进行确认之前确认代理具有该消息。
发布于 2013-02-26 17:49:07
我不能直接对Spring Integration库发表评论,所以我会大体谈一谈RabbitMQ。
如果您不是100%被Spring Integration实现的聚合器所折服,并且打算自己实现它,那么我建议您避免使用tx
,它在RabbitMQ中使用了幕后的事务。
RabbitMQ中的事务处理速度很慢,如果您正在构建一个高流量/吞吐量的系统,那么您肯定会遇到性能问题。
相反,我建议您看看Publisher Confirms,它是在RabbitMQ中实现的AMQP的扩展。以下是它是新http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/时的介绍。
您需要调整预取设置以获得正确的性能,请查看http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/了解一些详细信息。
以上所有内容都为您提供了一些背景知识,以帮助您解决问题。其实现相当简单。
在创建使用者时,您需要确保将其设置为需要ACK。
出队n条消息,在出队时,您需要记下每条消息的出队信息(这用于确认将消息放入新的队列中message)
需要注意的一件事是,如果你的消费者在3之后和4之前死了,那么那些没有被确认的消息将在它复活时被重新处理
https://stackoverflow.com/questions/15070878
复制相似问题