首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何根据给定的主题顺序构思kafka主题

如何根据给定的主题顺序构思kafka主题
EN

Stack Overflow用户
提问于 2021-03-02 01:31:54
回答 1查看 32关注 0票数 0

目前,我面临着一个问题,我需要按照指定的顺序消费来自不同主题的数据。假设我们有3个主题,分别称为topic-1、topic-2和topic-3。首先,我需要确保我按以下顺序使用主题。

代码语言:javascript
运行
复制
topic-2 > topic-3 > topic-1

应用程序应该侦听和读取topic-2中的所有消息,然后继续从topic-3消费,然后从topic-1消费。再次需要这样做,直到主题收到消息。

使用Kafka可以做到这一点吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-03-02 01:58:12

我不确定您是否有任何特殊的约束,但您可以尝试在您的应用程序代码中这样做:

代码语言:javascript
运行
复制
consumeAll(topic2); // when done, consume next topic
consumeAll(topic3); // when done, consume next topic
consumeAll(topic1);

但要注意:

如果新消息同时被附加到多个主题,您将无法在应用程序代码中重新创建插入顺序,因为Kafka只保证顺序

在单个主题中

分区

,而不是跨多个分区或多个主题。

您可以使用时间戳,它必须嵌入到Kafka消息中。所以你可以分辨出哪一个是先来的:

代码语言:javascript
运行
复制
{ messageId, payload, timestamp }

使用时间戳意味着所有的生产者都必须使用同步的时钟。否则,你可能会得到一些毫秒的漂移,而正确的顺序就消失了。

但随后您遇到了下一个问题:您希望在开始处理之前等待多长时间?(例如,如果topic3没有接收到新消息)

另一件需要考虑的事情是:您从topic3收到一条新消息。现在应该发生什么?根据您的描述,您不能处理它,因为必须先有来自topic2的消息。您希望等待来自topic2的新邮件多长时间?

不过,也许只监听topic2会更好。只有当您收到来自topic2的消息时,才会开始从topic3获取消息。当你从topic3那里得到一些东西时,你就会开始听topic1。然后重新开始。

如下所示:

代码语言:javascript
运行
复制
while(true) {
  msg2 = consumeAllNewMessages(topic2); // blocking call, until message received
  msg3 = consumeAllNewMessages(topic3); // blocking, too
  msg1 = consumeAllNewMessages(topic1); // blocking, too

  process(msg2, msg3, msg1);
}

(当然,您可以(也应该)将阻塞调用替换为一些非阻塞代码,例如使用CompletableFuture)

但是再说一遍:

这将只保证您使用这些主题的顺序。但它不会告诉你这些消息(跨主题)是以什么顺序发送给Kafka的。这需要具有同步时钟的嵌入式时间戳。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66426823

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档