我有一个RabbitMQ消息结构,其中消息A应该生成许多消息,我们称之为B和C。工作进程接收消息A,然后处理它并生成消息B和C。
假定的工作流流程如下:
A的消息ack=FalseBCack以获取消息A在任何情况下,工作进程都会在消息A的处理过程中死亡,或者当is尚未完成事务时,我希望RabbitMQ将消息A视为未传递的,并重新对其进行排队。
如果相关的话,RabbitMQ在高度可用的配置中运行。
为什么问题是试图清除RabbitMQ文档这里,并指出:
此外,即使事务只涉及单个队列,RabbitMQ也不提供原子性保证,例如,tx.commit期间的故障可能导致代理重新启动后队列中出现事务发布的子集。
发布于 2016-06-08 12:05:43
我犯了一个错误,瞄准了最受欢迎的产品,就像我觉得的那样。ActiveMQ完全支持高可用性和事务处理,所以我刚刚切换到它,我的问题就解决了。
发布于 2016-06-08 05:53:26
您需要相当多的基础设施(代码)才能使其正常工作。这相当棘手。
您可能想考虑使用服务总线。我在这里有一个免费的.net开源软件:http://shuttle.github.io/shuttle-esb/。
如果您不在.net空间,或者对使用服务总线不感兴趣,您可以按照下面的代码查看Shuttle.Esb:https://github.com/Shuttle/Shuttle.Esb是如何处理这些事情的。
您可能还想看看这里的IdempotenceService:https://github.com/Shuttle/Shuttle.Esb.SqlServer/blob/master/Shuttle.ESB.SqlServer/Idempotence/IdempotenceService.cs
当使用idempotence服务器(在处理程序中)时,所有发送的消息都存储在事务存储中(如本例中的sql服务器),并且只有在消息处理成功完成之后才发送。
对于消息处理程序之外的发送,可以使用事务性发件箱(同样,基于sql server表的队列也可以工作)。
关键是,你需要对你的设计进行一些认真的思考,这将是一个棘手的问题。
发布于 2016-06-08 13:40:08
我们有几个系统在生产中完全像这样工作,其基本格式是:
如果步骤2失败,消息A将“重新排队”,并可再次被拾取。
当消息被发送到队列并成功接收时,该消息被标记为ready。当它被发送到一个消费者时,它被标记为unacked,直到被该消费者确认,此时它被从队列中移除。
在您的示例中,在步骤6中确认消息之前,它仍然处于unacked状态,这意味着如果工作进程在步骤6之前死亡,消息将滑回RabbitMQ中的ready状态,准备发送给另一个工作人员。
此功能确实依赖于没有显式禁用确认这一事实。如果您是这样的话,我非常肯定消息在发送到使用者时会立即从队列中删除。
https://stackoverflow.com/questions/37688965
复制相似问题