package com.artisan.bootkafka.controller;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
public class TopicBacklog {
public static int getTotalBacklog(String topic) {
// Kafka客户端配置
Properties props = new Properties();
props.put("bootstrap.servers", "ip:port");
props.put("group.id", "attack-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// 创建KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅要查询的主题
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partition : partitions) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
// 手动分配分区
consumer.assign(topicPartitions);
// 记录未消费消息总数
int totalBacklog = 0;
// 遍历每个分区获取其未消费消息数并累加
for (PartitionInfo partition : partitions) {
TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
// 获取消费者的当前偏移量
long latestOffset = consumer.position(tp);
long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
int backlog = Math.toIntExact(endOffset - latestOffset);
totalBacklog += backlog;
}
// 返回未消费消息总数
return totalBacklog;
}
public static Map<String, Integer> getAllTopicsBacklog() {
// Kafka客户端配置
Properties props = new Properties();
props.put("bootstrap.servers", "ip:port");
props.put("group.id", "attack-consumer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 获取所有主题列表
Map<String, List<PartitionInfo>> topicMap = consumer.listTopics();
// 记录每个主题未消费消息总数
Map<String, Integer> backlogMap = new HashMap<>();
// 遍历每个主题,计算其未消费消息数
for (String topic : topicMap.keySet()) {
// 订阅要查询的主题
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo partition : partitions) {
topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
// 手动分配分区
consumer.assign(topicPartitions);
int backlog = 0;
for (PartitionInfo partition : partitions) {
TopicPartition tp = new TopicPartition(partition.topic(), partition.partition());
long latestOffset = consumer.position(tp);
long endOffset = consumer.endOffsets(Collections.singleton(tp)).get(tp);
backlog += Math.toIntExact(endOffset - latestOffset);
}
backlogMap.put(topic, backlog);
}
// 返回每个主题未消费消息总数
return backlogMap;
}
public static void main(String[] args) {
int backlog = getTotalBacklog("topic-test");
System.out.println(backlog);
getAllTopicsBacklog().forEach((topic, backlogCount) -> System.out.println(topic + " - " + backlogCount));
}
}
核对一下,23 。
有2个方法,第二个方法 Map<String, Integer> getAllTopicsBacklog()
虽然会返回所有的Topic 的积压量,但只有 对应的 消费组的数据是准确的。