首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何将特定偏移量中的kafka主题数据消费到特定偏移量?

如何将特定偏移量中的kafka主题数据消费到特定偏移量?
EN

Stack Overflow用户
提问于 2019-06-02 16:39:41
回答 1查看 5.3K关注 0票数 5

我需要消耗特定的偏移量到特定的结束偏移量!!consumer.seek()从特定的偏移量读取数据,但我需要将数据从偏移量检索到偏移量!!如有任何帮助,我们将不胜感激,谢谢。

代码语言:javascript
复制
    ConsumerRecords<String, String> records = consumer.poll(100);
    if(flag) {
        consumer.seek(new TopicPartition("topic-1", 0), 90);
        flag = false;
    }
EN

回答 1

Stack Overflow用户

发布于 2019-06-02 17:21:26

要读取从起始偏移量到结束偏移量的消息,首先需要使用seek()将消费者移动到所需的起始位置,然后使用poll(),直到到达所需的结束偏移量。

例如,从偏移量100到200消耗:

代码语言:javascript
复制
String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
    consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Move to the desired start offset 
            consumer.seek(tp, 100L);
        }
    });
    boolean run = true;
    long lastOffset = 200L;
    while (run) {
        ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord<String, String> record : crs) {
            System.out.println(record);
            if (record.offset() == lastOffset) {
                // Reached the end offsey, stop consuming
                run = false;
                break;
            }
        }
    }
}
票数 9
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56413483

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档