首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >有没有办法阻止卡夫卡消费者在一个特定的抵消?

有没有办法阻止卡夫卡消费者在一个特定的抵消?
EN

Stack Overflow用户
提问于 2017-08-30 05:39:22
回答 3查看 5.8K关注 0票数 5

我可以寻求一个特定的抵消。是否有办法在特定的偏移量处阻止消费者?换句话说,消耗直到我给定的偏移量。据我所知,卡夫卡并没有提供这样的功能。如果我错了,请纠正我。

例如:分区有偏移量1-10。我只想从3-8消费。在使用第八条消息后,程序应该退出。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-08-30 06:31:48

是的,kafka不提供这个功能,但是你可以在你的消费者代码中实现这一点。您可以尝试使用commitSync()来控制这一点。

公共无效commitSync(地图偏移) 为指定的主题和分区列表提交指定的偏移量。这是对卡夫卡的补偿。使用此API提交的偏移将在每次重新平衡后的第一次获取时使用,也将在启动时使用。因此,如果您需要在Kafka以外的任何东西中存储偏移量,则不应该使用此API。提交的偏移量应该是应用程序将要使用的下一条消息,即lastProcessedMessageOffset + 1。 这是一个同步提交,并将阻塞,直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛出给调用方)。

就像这样:

代码语言:javascript
运行
复制
 while (goAhead) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         if (record.offset() > OFFSET_BOUND) {
            consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
            goAhead = false;
            break;           
         }
         process(record);
     }
 }

您应该在上面的代码中将"enable.auto.commit“设置为false。在您的示例中,可以将OFFSET_BOUND设置为8。因为示例中的逗号偏移量仅为9,所以下次消费者将从该位置获取。

票数 2
EN

Stack Overflow用户

发布于 2017-08-30 06:28:23

假设分区偏移是连续的(即不压缩日志),您可以配置您的使用者(使用max.poll.records配置),以便它在每次轮询中读取一定数量的记录。这会让你停留在你想要的偏移量上。

票数 0
EN

Stack Overflow用户

发布于 2021-04-06 06:25:27

据我所知,max.poll.records是一个客户端特性。Kafka获取协议只有字节限制Fetch,所以您通常会读取更多的消息。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45952676

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档