我有一个Ingres DB with History表,它记录数据库事件,如插入、更新和删除。我有一个生产者,这将是多线程。这个生产者将读取History表,以找到要选择的表和行,然后将该行添加到Kafka主题中。现在生产者需要确保将事件添加到Kafka主题中,其方式与History表登录的方式相同。因此,使用者读取它们的顺序与它在History表中记录的顺序相同,并在Postgrace上执行它。
我可以将这些数据生成多个生产者。示例
Producer1 has message 1 to 5
producer2 has message 6 to 10
producer3 has message 11 to 15但是当我消费的时候,我会收到关于如下主题的消息
messageId 1
messageId 2
messageId 3
messageId 6
messageId 7
messageId 11诸若此类
我想按下面的顺序获取所有消息
messageId 1
messageId 2
messageId 3
messageId 4
messageId 5
messageId 6
messageId 7
messageId 8
messageId 9诸若此类
注意:-我有一个主题、一个分区和一个消费者
发布于 2019-01-16 03:10:19
Kafka不保证发送数据时的顺序,因为默认情况下每个主题有几个分区,如果没有键,消息会随机分配到分区。而在下游,每个分区都可以独立使用。
如果您需要保证插入和消费顺序,则需要将Kafka主题配置为仅使用一个分区。这是在卡夫卡中保证秩序的唯一方法。然而,你将失去kafka的许多好处,这些好处是分布在多个服务器、内核等上的高性能。
发布于 2019-01-16 13:14:07
您最多可以通过发送到单个分区来保持消息的顺序,即生产者创建消息的顺序。Kafka分区保证了消费消息的顺序,按照消息在分区中创建的顺序。
在您的场景中,消息是由多个生产者生成的,它们不同步,无法按顺序使用消息填充分区。因此,不可能在消费者端实现您所期望的订单。
发布于 2019-01-16 14:35:58
对于每个Google's recommendation,如果您使用同步发布者(生产者)和单个订阅者,请遵循the 2nd half of the page中Node代码中的算法,以保证处理的顺序。
同样,如果您有多个发布者,则需要通过在getPublishCounterValue方法和setPublishCounterValue方法之间设置临界区来同步发布者,这会破坏发布者的多线程特性。
最好的解决方案是遵循the section of
最终结果中的
顺序很重要
典型用例:日志、状态更新
多线程发布者必须将时间戳附加到每个发布/订阅事件消息,以便订阅者可以将事件消息存储在Google Cloud Datastore或Firestore中作为实体。单独的事件消息处理器cron作业可以以时间戳排序的方式检索事件消息的实体,以强制消息排序。
https://stackoverflow.com/questions/54205299
复制相似问题