专栏首页后端进阶Kafka 删除主题流程分析

Kafka 删除主题流程分析

之前有个 Kafka 集群的每个节点的挂载磁盘多达 20+ 个,平均每个磁盘约 1T,每个节点的分区日志被平均分配到这些磁盘中,但由于每个分区的数据不一致,而集群节点 log.retention.bytes 这个参数的默认值是 -1,也就是没有任何限制,因此 Kafka 的日志删除日志依赖 log.retention.hours 参数来删除,因此会出现日志未过期,磁盘写满的情况。

针对该集群双十一会遇到某些挂载磁盘被写满的情况,需要手动对主题进行删除以清空磁盘的操作,现在分析删除主题对集群以及客户端会有什么影响,以及 Kafka 都做了哪些动作。

图解删除过程

1. 删除主题

删除主题有多种方法,可通过 kafka-topic.sh 脚本并执行 --delete 命令,或者用暴力方式直接在 zk 删除对应主题节点,其实删除主题无非就是令 zk 节点删除,以触发 controller 对应监听器,然后再通过监听器通知到所有 broker,具体流程如下:

删除主题执行后,controller 监听到 zk 主题节点被删除,通知到所有 broker 删除主题对应的副本,这里会分成两个步骤,第一个步骤先将下线主题对应的副本,最后才执行真正的删除操作,注意,这里也并为真正的将主题从磁盘中删除,此时仅仅只会将要删除的副本所在的目录重命名,以免之后创建主题时目录有冲突,每个 broker 都会有一个定时线程,定时清除已重命名为删除状态的日志文件,具体如下:

2. 自动创建主题

自动创建主题的前提是 broker 配置参数 auto.create.topic.enble=true,删除主题后,当 Producer 发送时会对发送进行重试,期间会发送 MetadataRquest 命令到 broker 请求获取最新的元数据,在获取元数据的同时,会判断是否需要自动创建主题,如果需要,则调用 zk 客户端创建主题节点,controller 监听到有新主题创建,就会触发 controller 相关状态机工作创建主题。

刚刚也说过,kafka 重命名要删除的主题后,并不会立马就会删除,而是等待异步线程去删除,如下图所示,重命名后与重新创建的分区不冲突,可以证明删除是异步执行的了,且不影响生产发送,但是被重命名后的日志就不能消费了,即丢失了。

如下图可看出,在一分钟后,重命名后的副本被删除。

相关日志分析

1、controller.log

触发删除主题监听器:

[2019-11-07 19:24:11,121] DEBUG [Controller id=0] Delete topics listener fired for topics test-topic to be deleted (kafka.controller.KafkaController)

开始删除主题操作:

[2019-11-07 19:24:11,121] INFO [Topic Deletion Manager 0] Handling deletion for topics test-topic (kafka.controller.TopicDeletionManager)

开始停止主题,但此时并未删除:

[2019-11-07 19:24:11,143] DEBUG The stop replica request (delete = false) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],false) (kafka.controller.ControllerBrokerRequestBatch)

开始执行真正的删除动作:

[2019-11-07 19:24:11,145] DEBUG [Topic Deletion Manager 0] Deletion started for replicas

[2019-11-07 19:24:11,147] DEBUG The stop replica request (delete = true) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],true) (kafka.controller.ControllerBrokerRequestBatch)

收到 broker 删除的回调:

[2019-11-07 19:24:11,170] DEBUG [Controller id=0] Delete topic callback invoked on StopReplica response received from broker 2: request error = NONE, partition errors = Map(test-topic-2 -> NONE, test-topic-0 -> NONE, test-topic-1 -> NONE) (kafka.controller.KafkaController)

[2019-11-07 19:24:11,170] DEBUG [Topic Deletion Manager 0] Deletion successfully completed for replicas

已经成功全部删除:

[2019-11-07 19:24:11,202] INFO [Topic Deletion Manager 0] Deletion of topic test-topic successfully completed (kafka.controller.TopicDeletionManager)

如果此时有新的消息写入,会自动创建主题:

[2019-11-07 19:24:11,203] INFO [Controller id=0] New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()] (kafka.controller.KafkaController)

[2019-11-07 19:24:11,267] INFO [Controller id=0] New topics: [Set(test-topic)], deleted topics: [Set()], new partition replica assignment [Map(test-topic-2 -> Vector(1, 2, 0), test-topic-1 -> Vector(0, 1, 2), test-topic-0 -> Vector(2, 0, 1))] (kafka.controller.KafkaController)

[2019-11-07 19:24:11,267] INFO [Controller id=0] New partition creation callback for test-topic-2,test-topic-1,test-topic-0 (kafka.controller.KafkaController)

2、server.log

broker 收到删除主题通通知(此时并没有删除):

[2019-11-07 19:24:11,144] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test-topic-2, test-topic-0, test-topic-1) (kafka.server.ReplicaFetcherManager)

停止分区 fetch 线程:

[2019-11-07 19:24:11,145] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)

[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=293639440, epoch=1824) to node 1: java.io.IOException: Client was shutdown before response was read. (org.apache.kafka.clients.FetchSessionHandler)

[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)

[2019-11-07 19:24:11,147] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)

接收到真正删除主题指令后,会重命名分区日志目录,此时还未删除,会等待异步线程执行:

[2019-11-07 19:24:11,157] INFO Log for partition test-topic-2 is renamed to /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete and is scheduled for deletion (kafka.log.LogManager)

如果此时有新的消息写入,会自动创建主题:

[2019-11-08 15:39:39,343] INFO Creating topic test-topic with configuration {} and initial partition assignment Map(2 -> ArrayBuffer(1, 0, 2), 1 -> ArrayBuffer(0, 2, 1), 0 -> ArrayBuffer(2, 1, 0)) (kafka.zk.AdminZkClient)

[2019-11-08 15:39:39,369] INFO [KafkaApi-1] Auto creation of topic test-topic with 3 partitions and replication factor 3 is successful (kafka.server.KafkaApis)

[2019-11-07 19:24:11,286] INFO Created log for partition test-topic-0 in /tmp/kafka-logs/kafka_3 with properties {...}

异步线程删除重命名后的主题:

[2019-11-07 19:25:11,161] INFO Deleted log /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.log. (kafka.log.LogSegment)

[2019-11-07 19:25:11,163] INFO Deleted offset index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.index. (kafka.log.LogSegment)

[2019-11-07 19:25:11,164] INFO Deleted time index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.timeindex. (kafka.log.LogSegment)

[2019-11-07 19:25:11,165] INFO Deleted log for partition test-topic-2 in /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete. (kafka.log.LogManager)

本文分享自微信公众号 - 后端进阶(objcoding),作者:张乘辉

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 深度解析RocketMQ Topic的创建机制

    我还记得第一次使用rocketmq的时候,需要去控制台预先创建topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透rocketmq ...

    张乘辉
  • Kafka 常用运维脚本

    张乘辉
  • 当 Kafka 分区不可用且 leader 副本被损坏时,如何尽量减少数据的丢失?

    经过上次 Kafka 日志集群某节点重启失败导致某个主题分区不可用的事故之后,这篇文章专门对分区不可用进行故障重现,并给出我的一些骚操作来尽量减少数据的丢失。

    张乘辉
  • kafka安装与测试

    Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic, Topic即主题,通过对消息指定主题可以将消息分类,消费...

    瑾诺学长
  • 微信扫码支付(模式一)微信扫码支付(模式一)

    官方文档地址:https://pay.weixin.qq.com/wiki/doc/api/native.php?chapter=6_4

    Javen
  • 教育平台项目后台管理系统:接口文档

    RendaZhang
  • 原 js页面传值参数打包类

    魂祭心
  • 拱门大集结:魅蓝官网改名蓝拱门?

     还记得当麦当劳改名为金拱门的那天,魅蓝手机也跟进热门称“如果我改名叫蓝拱门手机,你们还会爱我吗?”

    躲在树上的域小名
  • 再来做个小工具,简易计算器,用到了jfinal enjoy,140行代码不到

    用户6167008
  • 手机摄像头的小秘密

    21世纪初,夏普与当时的日本通信运营商J-PHONE发明了夏普J-SH04,夏普J-SH04具有拍照功能。2003年4月24日夏普发售了全球首款百万像素手机J-...

    鲜枣课堂

扫码关注云+社区

领取腾讯云代金券