首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >java -如何在kafka中显示每个分区而不是每个记录?

java -如何在kafka中显示每个分区而不是每个记录?
EN

Stack Overflow用户
提问于 2018-10-18 12:00:45
回答 1查看 674关注 0票数 1

我目前正在java中构建一个kafka使用者,它只会显示特定topicgroup id的分区(我有10个分区)和偏移量。我的当前代码显示给定输入的每条记录(或每行数据)。如果我有10个分区和15行数据,它将显示15行和多个分区实例。

以下是我对消费者的设置:

代码语言:javascript
复制
private static Consumer<Long, String> createConsumer() {
    System.out.println("CREATE CONSUMER");
    //Configure consumer settings/properties
    final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(TOPIC));
    return consumer;
}

下面是我用于显示输出的代码:

代码语言:javascript
复制
while (noRecordsCount < giveUp)
    {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(500);

        if (consumerRecords.count() == 0)
        {
            noRecordsCount++;
            if (noRecordsCount > giveUp) break;
            else continue;
        }          

        //Stores each topic and partition to a specific array list for easier output manipulation
        consumerRecords.forEach(record -> {
            partitionrecord.add(record.partition());
            offsetrecord.add(record.offset());
            System.out.printf("Consumer Record: %s (%d, %d)"+"\n", TOPIC, record.partition(), record.offset()); 

        });

    }

代码输出:

预期输出显示的是每个分区的一个实例,而不是每个记录(主题名称、分区编号、偏移量):

我需要做的是显示10个分区,而不是显示每个(15)条记录及其特定信息(偏移量、分区、值等)。我是否需要在代码中添加任何特定的命令或函数?我是第一次接触堆栈溢出,如果我的问题很长,我很抱歉。

EN

回答 1

Stack Overflow用户

发布于 2018-10-18 17:57:56

对于分配给使用者实例的所有分区,您将始终获得开始时的最新偏移量之后的所有记录。

如果您运行10个使用者,您应该希望看到每个实例只有一个分区,但仍然是所有偏移量。

不存在只获取一条记录的设置,因为这取决于您的生产者,在消费者开始等待它们之后,只均匀地发送N条消息。

对于分配了多个分区的使用者实例,也不能保证跨分区进行排序

但是,您可以使用TreeMap或最大堆数据结构来存储数据点,然后按顺序遍历分区并输出每个分区的最大消耗偏移量

换句话说,您当前打印出了每条记录,而不是在分区上的所有循环之后打印,因此您得到的是第一个显示的输出

所以,没有办法在Kafka中做你想做的事情,但你问的问题实际上是如何在获得记录时对它们进行批量处理,然后只存储最大值,最后在哪里输出这些信息。

注意:GetOffsetShell命令已经可以查询所有分区的最大当前偏移量

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52866829

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档