我有一个用例,在这个用例中,我必须使用一种队列机制,以确保消息由消费者(“工作者”)按顺序逐一处理。
我过去使用过RabbitMQ,它保证了接收消息的顺序。但是如果这个顺序不正确呢?
假设我提交消息4、5、3、2、1,RabbitMQ消费者将按此顺序处理消息。如果我希望以1、2、3、4、5的顺序处理它们,因为这些消息相互依赖,该怎么办?
此外,我不想让消费者在确认消息2之前使用消息3(没有间隙)。
是否有支持此用例的排队解决方案?目前,我们将消息转储到数据库中,并让工作人员定期按顺序拉取数据。
发布于 2017-10-27 06:25:02
考虑两种模式中的一种:“消息序列”或“重序列器”。
Message Sequence非常有用,如下所示:
每当可能需要将大量数据分成消息大小的块时,将数据作为消息序列发送,并使用序列标识字段标记每条消息。
Resequencer稍有不同:
重定序器可以接收可能未按顺序到达的消息流。Resequencer包含在内部缓冲区中,用于存储乱序消息,直到获得完整的序列。然后,序列中的消息被发布到输出通道。很重要的一点是,输出通道要保持顺序,这样才能保证消息按顺序到达下一个组件。与大多数其它路由器一样,重定序器通常不会修改消息内容。
我强烈建议阅读Gregor Hohpe/Bobby Woolf的“企业集成模式”一书中的这些模式( Martin Fowler等人的贡献)。
在队列的末尾需要一个适配器来处理序列。
另一方面,定序器“将乱序消息存储在内部缓冲区中,直到获得完整的序列,然后以正确的顺序将消息发布到输出通道”(第285页)。
这与您当前使用的“转储到数据库,然后使用工作进程拉”策略非常相似。
这些模式的实现细节将基于您的应用程序语言和队列的选择(在您的例子中是RabbitMQ),但是这些模式已经非常成熟,所以我将很好地研究它们。
我不知道RabbitMQ本身中有什么内置机制可以帮助您实现这一点。
希望这能有所帮助。
编辑
我在谷歌上搜索了"RabbitMQ resequencer“,发现了以下内容(虽然不能证明它的有效性):rabbus-sequence (GitHub)。
在任何情况下,看看他们的代码在做什么可能会有所帮助,看看你是否可以从中获得一些灵感。
https://stackoverflow.com/questions/46964586
复制相似问题