首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何使用java获取kafka lag

如何使用java获取kafka lag
EN

Stack Overflow用户
提问于 2018-10-19 15:57:37
回答 2查看 3.6K关注 0票数 2

我目前已经开发了一个可以显示主题、分区和日志偏移量的代码。但我目前被困在如何获得分区的滞后。我知道有一个kafka offset命令可以实现这个功能,但我需要的是一个java代码。

代码语言:javascript
复制
public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
    }

    System.out.printf("CREATE CONSUMER DONE");
    consumer.close();

我需要做的是输出主题、分区、当前偏移量、日志偏移量和滞后。我如何获取代码的延迟,或者如何获取代码的当前偏移量。(有关所需输出,请参见图像)。

注意:我不能使用(foreach记录)功能,因为我不能读取输入文件中的每条记录。

EN

回答 2

Stack Overflow用户

发布于 2018-10-21 04:38:29

要重现kafka-consumer-groups功能,您需要一个消费者实例和一个AdminClient实例。

首先,使用AdminClient,您可以调用listConsumerGroupOffsets()来检索特定组的主题分区列表和提交的偏移量。

然后使用Consumer来获取这些分区的结束偏移量。你使用的方法效率很低,不需要分配和搜索结束偏移量。您可以简单地调用endOffsets()

这足以重现屏幕截图中包含的数据。

kafka-consumer-groups还使用AdminClient.describeConsumerGroups()打印分配给每个分区的组成员(如果有的话)。

票数 6
EN

Stack Overflow用户

发布于 2019-12-04 14:39:40

你可以通过从消费者那里获取EndOffset来获得LAG

代码语言:javascript
复制
Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets =consumer.endOffsets(consumer.assignment());

然后迭代where over集合

代码语言:javascript
复制
for(TopicPartition tp : partitionSet) { LOG.info("Topic :: {} ,EndOffset :: {}, currentOffset {}",tp.topic(),beginningOffsets.get(tp),endOffsets.get(tp), consumer.position(tp)); }

consumer.position(tp) --将得到当前的偏移量,从endoffset中减去这个值,就会得到滞后

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52888094

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档