我试图使用Java Admin Client API的删除记录方法从我的kafka主题中删除消息。以下是我尝试过的步骤
我将20000条记录推送到了TEST-DELETE主题
2.启动控制台使用者并消耗所有消息
3.调用我的java程序删除所有这些20k消息
4.启动另一个具有不同组ID的控制台消费者。此消费者未收到任何已删除的消息
当我检查文件系统时,我仍然可以看到占用磁盘空间的所有20k记录。我的目的是永远从文件系统中删除这些记录。
下面给出了我的主题配置以及server.properties设置
主题:TEST-DELETE PartitionCount:4 ReplicationFactor:1配置:cleanup.policy = delete
主题:TEST-DELETE分区:0领导:0副本:0 Isr:0
主题:TEST-DELETE分区:1个领导者:0个副本:0个Isr:0
主题:TEST-DELETE分区:2个领导者:0个副本:0个Isr:0
主题:TEST-DELETE分区:3个领导者:0个副本:0个Isr:0
log.retention.hours = 24
log.retention.check.interval.ms = 60000
log.cleaner.delete.retention.ms = 60000
file.delete.delay.ms = 60000
delete.retention.ms = 60000
offsets.retention.minutes = 5
offsets.retention.check.interval.ms = 60000
log.cleaner.enable =真
log.cleanup.policy =紧凑,删除
我的删除代码如下
public void deleteRecords(Map<String, Map<Integer, Long>> allTopicPartions) {
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
allTopicPartions.entrySet().forEach(topicDetails -> {
String topicName = topicDetails.getKey();
Map<Integer, Long> value = topicDetails.getValue();
value.entrySet().forEach(partitionDetails -> {
if (partitionDetails.getValue() != 0) {
recordsToDelete.put(new TopicPartition(topicName, partitionDetails.getKey()),
RecordsToDelete.beforeOffset(partitionDetails.getValue()));
}
});
});
DeleteRecordsResult deleteRecords = this.client.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = deleteRecords.lowWatermarks();
lowWatermarks.entrySet().forEach(entry -> {
try {
logger.info(entry.getKey().topic() + " " + entry.getKey().partition() + " "
+ entry.getValue().get().lowWatermark());
} catch (Exception ex) {
}
});
}
我的java程序的输出如下
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 1 5000
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 0 5000
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 3 5000
2019-06-25 16:21:15 INFO MyKafkaAdminClient:247 - TEST-DELETE 2 5000
我的目的是从我的kafka经纪人的有限存储空间中删除文件系统中消耗的记录。
我想得到一些帮助,我的疑虑
感谢您的帮助
谢谢
发布于 2019-06-26 11:56:41
处理此问题的推荐方法是为retention.ms
您感兴趣的主题设置相关配置值。这样,您可以定义Kafka将数据存储多长时间,直到删除它为止,确保所有下游消费者都有机会在从Kafk集群中删除数据之前下拉数据。
但是,如果您仍然希望强制Kafka基于字节进行删除,则可以使用log.retention.bytes
和retention.bytes
配置值。第一个是群集范围的设置,第二个是特定于主题的设置,默认情况下会采用第一个设置的设置,但您仍然可以按主题覆盖它。该retention.bytes
数字是按分区强制执行的,因此您应该将其乘以主题分区的总数。
但请注意,如果您有一个失控的生产者突然开始生成大量数据,并且您将其设置为硬字节限制,则可能会消除群集中整天的数据,并且只能是留下最后几分钟的数据,甚至在有效的消费者可以从集群中提取数据之前。这就是为什么它是很多更好地设置您的卡夫卡主题有基于时间的保留,而不是字节为基础的。
您可以在官方Kafka文档中找到配置属性及其说明:https://kafka.apache.org/documentation/
https://stackoverflow.com/questions/-100009103
复制相似问题