目前,我面临着一个问题,我需要按照指定的顺序消费来自不同主题的数据。假设我们有3个主题,分别称为topic-1、topic-2和topic-3。首先,我需要确保我按以下顺序使用主题。
topic-2 > topic-3 > topic-1应用程序应该侦听和读取topic-2中的所有消息,然后继续从topic-3消费,然后从topic-1消费。再次需要这样做,直到主题收到消息。
使用Kafka可以做到这一点吗?
发布于 2021-03-02 01:58:12
我不确定您是否有任何特殊的约束,但您可以尝试在您的应用程序代码中这样做:
consumeAll(topic2); // when done, consume next topic
consumeAll(topic3); // when done, consume next topic
consumeAll(topic1);但要注意:
如果新消息同时被附加到多个主题,您将无法在应用程序代码中重新创建插入顺序,因为Kafka只保证顺序
在单个主题中
分区
,而不是跨多个分区或多个主题。
您可以使用时间戳,它必须嵌入到Kafka消息中。所以你可以分辨出哪一个是先来的:
{ messageId, payload, timestamp }使用时间戳意味着所有的生产者都必须使用同步的时钟。否则,你可能会得到一些毫秒的漂移,而正确的顺序就消失了。
但随后您遇到了下一个问题:您希望在开始处理之前等待多长时间?(例如,如果topic3没有接收到新消息)
另一件需要考虑的事情是:您从topic3收到一条新消息。现在应该发生什么?根据您的描述,您不能处理它,因为必须先有来自topic2的消息。您希望等待来自topic2的新邮件多长时间?
不过,也许只监听topic2会更好。只有当您收到来自topic2的消息时,才会开始从topic3获取消息。当你从topic3那里得到一些东西时,你就会开始听topic1。然后重新开始。
如下所示:
while(true) {
msg2 = consumeAllNewMessages(topic2); // blocking call, until message received
msg3 = consumeAllNewMessages(topic3); // blocking, too
msg1 = consumeAllNewMessages(topic1); // blocking, too
process(msg2, msg3, msg1);
}(当然,您可以(也应该)将阻塞调用替换为一些非阻塞代码,例如使用CompletableFuture)
但是再说一遍:
这将只保证您使用这些主题的顺序。但它不会告诉你这些消息(跨主题)是以什么顺序发送给Kafka的。这需要具有同步时钟的嵌入式时间戳。
https://stackoverflow.com/questions/66426823
复制相似问题