前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Kafka-通过API获取主题所有分区的积压消息数量

Apache Kafka-通过API获取主题所有分区的积压消息数量

作者头像
小小工匠
发布2023-05-09 16:09:36
1.3K0
发布2023-05-09 16:09:36
举报
文章被收录于专栏:小工匠聊架构小工匠聊架构
在这里插入图片描述
在这里插入图片描述

实现

代码语言:javascript
复制
package com.artisan.bootkafka.controller;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;


import java.util.*;

public class TopicBacklog {
    public static int getTotalBacklog(String topic) {
        // Kafka客户端配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:port");
        props.put("group.id", "attack-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        // 创建KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅要查询的主题
        List<PartitionInfo> partitions = consumer.partitionsFor(topic);
        List<TopicPartition> topicPartitions = new ArrayList<>();
        for (PartitionInfo partition : partitions) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }

        // 手动分配分区
        consumer.assign(topicPartitions);

        // 记录未消费消息总数
        int totalBacklog = 0;

        // 遍历每个分区获取其未消费消息数并累加
        for (PartitionInfo partition : partitions) {
            TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
            // 获取消费者的当前偏移量
            long latestOffset = consumer.position(tp);
            long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
            int backlog = Math.toIntExact(endOffset - latestOffset);
            totalBacklog += backlog;
        }

        // 返回未消费消息总数
        return totalBacklog;
    }


    public static Map<String, Integer> getAllTopicsBacklog() {
        // Kafka客户端配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "ip:port");
        props.put("group.id", "attack-consumer");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 获取所有主题列表
        Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();

        // 记录每个主题未消费消息总数
        Map<String, Integer> backlogMap = new HashMap<>();

        // 遍历每个主题,计算其未消费消息数
        for (String topic : topicMap.keySet()) {
            // 订阅要查询的主题
            List<PartitionInfo> partitions = consumer.partitionsFor(topic);
            List<TopicPartition> topicPartitions = new ArrayList<>();
            for (PartitionInfo partition : partitions) {
                topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }

            // 手动分配分区
            consumer.assign(topicPartitions);

            int backlog = 0;

            for (PartitionInfo partition : partitions) {
                TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
                long latestOffset = consumer.position(tp);
                long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
                backlog += Math.toIntExact(endOffset - latestOffset);
            }
            backlogMap.put(topic, backlog);
        }
        // 返回每个主题未消费消息总数
        return backlogMap;
    }

    public static void main(String[] args) {
        int backlog = getTotalBacklog("topic-test");
        System.out.println(backlog);

        getAllTopicsBacklog().forEach((topic, backlogCount) -> System.out.println(topic + " - " + backlogCount));
    }
}
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

核对一下,23 。


有2个方法,第二个方法 Map<String, Integer> getAllTopicsBacklog() 虽然会返回所有的Topic 的积压量,但只有 对应的 消费组的数据是准确的。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-05-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档