Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Kafka 删除主题流程分析

Kafka 删除主题流程分析

作者头像
张乘辉
发布于 2019-11-11 10:30:41
发布于 2019-11-11 10:30:41
1.2K00
代码可运行
举报
文章被收录于专栏:后端进阶后端进阶
运行总次数:0
代码可运行

之前有个 Kafka 集群的每个节点的挂载磁盘多达 20+ 个,平均每个磁盘约 1T,每个节点的分区日志被平均分配到这些磁盘中,但由于每个分区的数据不一致,而集群节点 log.retention.bytes 这个参数的默认值是 -1,也就是没有任何限制,因此 Kafka 的日志删除日志依赖 log.retention.hours 参数来删除,因此会出现日志未过期,磁盘写满的情况。

针对该集群双十一会遇到某些挂载磁盘被写满的情况,需要手动对主题进行删除以清空磁盘的操作,现在分析删除主题对集群以及客户端会有什么影响,以及 Kafka 都做了哪些动作。

图解删除过程

1. 删除主题

删除主题有多种方法,可通过 kafka-topic.sh 脚本并执行 --delete 命令,或者用暴力方式直接在 zk 删除对应主题节点,其实删除主题无非就是令 zk 节点删除,以触发 controller 对应监听器,然后再通过监听器通知到所有 broker,具体流程如下:

删除主题执行后,controller 监听到 zk 主题节点被删除,通知到所有 broker 删除主题对应的副本,这里会分成两个步骤,第一个步骤先将下线主题对应的副本,最后才执行真正的删除操作,注意,这里也并为真正的将主题从磁盘中删除,此时仅仅只会将要删除的副本所在的目录重命名,以免之后创建主题时目录有冲突,每个 broker 都会有一个定时线程,定时清除已重命名为删除状态的日志文件,具体如下:

2. 自动创建主题

自动创建主题的前提是 broker 配置参数 auto.create.topic.enble=true,删除主题后,当 Producer 发送时会对发送进行重试,期间会发送 MetadataRquest 命令到 broker 请求获取最新的元数据,在获取元数据的同时,会判断是否需要自动创建主题,如果需要,则调用 zk 客户端创建主题节点,controller 监听到有新主题创建,就会触发 controller 相关状态机工作创建主题。

刚刚也说过,kafka 重命名要删除的主题后,并不会立马就会删除,而是等待异步线程去删除,如下图所示,重命名后与重新创建的分区不冲突,可以证明删除是异步执行的了,且不影响生产发送,但是被重命名后的日志就不能消费了,即丢失了。

如下图可看出,在一分钟后,重命名后的副本被删除。

相关日志分析

1、controller.log

触发删除主题监听器:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,121] DEBUG [Controller id=0] Delete topics listener fired for topics test-topic to be deleted (kafka.controller.KafkaController)

开始删除主题操作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,121] INFO [Topic Deletion Manager 0] Handling deletion for topics test-topic (kafka.controller.TopicDeletionManager)

开始停止主题,但此时并未删除:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,143] DEBUG The stop replica request (delete = false) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],false),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],false) (kafka.controller.ControllerBrokerRequestBatch)

开始执行真正的删除动作:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,145] DEBUG [Topic Deletion Manager 0] Deletion started for replicas

[2019-11-07 19:24:11,147] DEBUG The stop replica request (delete = true) sent to broker 2 is StopReplicaRequestInfo([Topic=test-topic,Partition=1,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=0,Replica=2],true),StopReplicaRequestInfo([Topic=test-topic,Partition=2,Replica=2],true) (kafka.controller.ControllerBrokerRequestBatch)

收到 broker 删除的回调:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,170] DEBUG [Controller id=0] Delete topic callback invoked on StopReplica response received from broker 2: request error = NONE, partition errors = Map(test-topic-2 -> NONE, test-topic-0 -> NONE, test-topic-1 -> NONE) (kafka.controller.KafkaController)

[2019-11-07 19:24:11,170] DEBUG [Topic Deletion Manager 0] Deletion successfully completed for replicas

已经成功全部删除:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,202] INFO [Topic Deletion Manager 0] Deletion of topic test-topic successfully completed (kafka.controller.TopicDeletionManager)

如果此时有新的消息写入,会自动创建主题:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,203] INFO [Controller id=0] New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()] (kafka.controller.KafkaController)

[2019-11-07 19:24:11,267] INFO [Controller id=0] New topics: [Set(test-topic)], deleted topics: [Set()], new partition replica assignment [Map(test-topic-2 -> Vector(1, 2, 0), test-topic-1 -> Vector(0, 1, 2), test-topic-0 -> Vector(2, 0, 1))] (kafka.controller.KafkaController)

[2019-11-07 19:24:11,267] INFO [Controller id=0] New partition creation callback for test-topic-2,test-topic-1,test-topic-0 (kafka.controller.KafkaController)

2、server.log

broker 收到删除主题通通知(此时并没有删除):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,144] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions Set(test-topic-2, test-topic-0, test-topic-1) (kafka.server.ReplicaFetcherManager)

停止分区 fetch 线程:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,145] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)

[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=293639440, epoch=1824) to node 1: java.io.IOException: Client was shutdown before response was read. (org.apache.kafka.clients.FetchSessionHandler)

[2019-11-07 19:24:11,146] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)

[2019-11-07 19:24:11,147] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)

接收到真正删除主题指令后,会重命名分区日志目录,此时还未删除,会等待异步线程执行:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:24:11,157] INFO Log for partition test-topic-2 is renamed to /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete and is scheduled for deletion (kafka.log.LogManager)

如果此时有新的消息写入,会自动创建主题:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-08 15:39:39,343] INFO Creating topic test-topic with configuration {} and initial partition assignment Map(2 -> ArrayBuffer(1, 0, 2), 1 -> ArrayBuffer(0, 2, 1), 0 -> ArrayBuffer(2, 1, 0)) (kafka.zk.AdminZkClient)

[2019-11-08 15:39:39,369] INFO [KafkaApi-1] Auto creation of topic test-topic with 3 partitions and replication factor 3 is successful (kafka.server.KafkaApis)

[2019-11-07 19:24:11,286] INFO Created log for partition test-topic-0 in /tmp/kafka-logs/kafka_3 with properties {...}

异步线程删除重命名后的主题:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[2019-11-07 19:25:11,161] INFO Deleted log /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.log. (kafka.log.LogSegment)

[2019-11-07 19:25:11,163] INFO Deleted offset index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.index. (kafka.log.LogSegment)

[2019-11-07 19:25:11,164] INFO Deleted time index /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete/00000000000000000000.timeindex. (kafka.log.LogSegment)

[2019-11-07 19:25:11,165] INFO Deleted log for partition test-topic-2 in /tmp/kafka-logs/kafka_3/test-topic-2.93ed68ff29d64a01a3f15937859124f7-delete. (kafka.log.LogManager)
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端进阶 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Kafka源码系列之如何删除topic
本文依然是以kafka0.8.2.2为例讲解 一,如何删除一个topic 删除一个topic有两个关键点: 1,配置删除参数 delete.topic.enable这个Broker参数配置为True。 2,执行 bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name 假如不配置删除参数为true的话,topic其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除topic在Zookeep
Spark学习技巧
2018/01/30
1.5K0
万字长文解析删除Topic流程领导再也不用担心我排查生产环境问题了(附教学视频,建议收藏!!!)
支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来 例如: 删除以create_topic_byhand_zk为开头的topic;
石臻臻的杂货铺[同名公众号]
2021/08/18
6980
万字长文解析删除Topic流程领导再也不用担心我排查生产环境问题了(附教学视频,建议收藏!!!)
【kafka源码】TopicCommand之alter源码解析(分区扩容)
PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;
石臻臻的杂货铺[同名公众号]
2022/09/19
5280
【kafka源码】ReassignPartitionsCommand分区副本重分配源码原理分析(附配套教学视频)
(后续的视频会在 公众号[全套视频首发]、CSDN、B站等各平台同名号[石臻臻的杂货铺]上上传 )
石臻臻的杂货铺[同名公众号]
2022/11/30
6380
【kafka源码】ReassignPartitionsCommand分区副本重分配源码原理分析(附配套教学视频)
Apache Kafka-初体验Kafka(02)-Centos7下搭建单节点kafka_配置参数详解_基本命令实操
由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK.
小小工匠
2021/08/17
5340
Kafka集群建立过程分析
从本章开始我们来介绍一个kafka集群逐步建立的过程; 集群中只有一台broker; topic的创建; 增加多台broker; 扩展已存在topic的partition; ---- 第一个broker(我们叫它B1)启动 broker启动流程,请参考Kafka初始化流程与请求处理; broker在启动过程中, 会先启动KafkaController, 因为此时只有一台broker B1, 它将被选为当前kafka集群的Controller, 过程可参考KafkaController分析1-选主和Fail
扫帚的影子
2018/09/05
4850
Kafka集群建立过程分析
10分钟带你玩转Kafka基于Controller的领导选举!
导语 | Controller作为Apache Kafka的核心组件,本文将从背景、原理以及源码与监控等方面来深入剖析Kafka Controller,希望带领大家去了解Controller在整个Kafka集群中的作用。 一、背景 Controller,是Apache Kafka的核心组件非常重要。它的主要作用是在Apache Zookeeper的帮助下管理和协调控制整个Kafka集群。 在整个Kafka集群中,如果Controller故障异常,有可能会影响到生产和消费。所以,我们需要对其状态、选
腾讯云开发者
2021/10/09
1.1K0
Kafka原理和实践
本文从Kafka的基本概念、特点、部署和配置、监控和管理等方面阐述 Kafka 的实践过程。
杨振涛
2019/08/08
1.4K0
Kafka集群安装
①.kafka需要依赖zk管理,在搭建kafka集群之前需要先搭建zk集群: https://my.oschina.net/u/2486137/blog/1537389 ②.从apache kafka官网下载kafka( 二进制版本)        注意下载的版本否则会在启动时报错:找不到主类Kafka.kafka. 我这里使用的是2.10版本. ③.配置config/server.properties文件: # Licensed to the Apache Software Foundation (ASF
用户1215919
2018/02/27
1.2K0
kafka删除主题_kafka从头消费topic数据
转自https://www.cnblogs.com/xiaodf/p/10710136.html
全栈程序员站长
2022/11/03
6250
kafka数据存储目录间迁移
生产环境kafka集群,在数据量大的情况下,经常会出现单机各个磁盘间的占用不均匀情况。
一条老狗
2019/12/26
4.3K1
kafka数据存储目录间迁移
悄悄掌握 Kafka 常用命令,再也不用全网搜索了(建议收藏)
前言 对于从事大数据相关职位的朋友们来说,使用 kafka 的频率应该不会少。为了解决各位在操作 kafka 时记不住命令参数的痛点,所以我整理了一下在我工作中经常用到的 kafka 实操命令,希望各位看官能够喜欢。 kafka版本:2.11-1.1.0
create17
2020/12/16
9.2K0
悄悄掌握 Kafka 常用命令,再也不用全网搜索了(建议收藏)
【源码分析】Kafka分区重分配/迁移(kafka-reassign-partitions.sh)
cd kafka_home/bin cat kafka-reassign-partitions.sh
全栈程序员站长
2022/09/13
1.3K0
Kafka基本架构介绍
该文介绍了如何使用Kafka进行分布式消息处理系统。文章首先介绍了Kafka的基本概念,然后详细描述了Kafka的架构和组件。接着,文章深入探讨了Kafka的复制和分布式协调功能,以及如何使用Kafka进行消息处理。最后,文章介绍了Kafka的性能优化和常见问题解决方案。
程裕强
2018/01/02
3.5K0
Kafka基本架构介绍
Kafka源码系列之源码分析zookeeper在kafka的作用
浪尖的kafka源码系列以kafka0.8.2.2源码为例给大家进行讲解的。纯属个人爱好,希望大家对不足之处批评指正。 一,zookeeper在分布式集群的作用 1,数据发布与订阅(配置中心) 发布与订阅模型,即所谓的配置中心,顾名思义就是讲发布者将数据发布到zk节点上,共订阅者动态获取数据,实现配置的集中式管理和动态更新。例如,全局的配置信息,服务服务框架的地址列表就非常适合使用。 2,负载均衡 即软件负载均衡。最典型的是消息中间件的生产、消费者负载均衡。 3,命名服务(Naming Service)
Spark学习技巧
2018/01/30
1.2K0
Kafka源码系列之源码分析zookeeper在kafka的作用
❤️3万字长文呕心沥血教你彻底搞懂数据迁移原理❤️(附配套教学视频)
(后续的视频会在 公众号[全套视频首发]、CSDN、B站等各平台同名号[石臻臻的杂货铺]上上传 )
石臻臻的杂货铺[同名公众号]
2021/08/03
4880
❤️3万字长文呕心沥血教你彻底搞懂数据迁移原理❤️(附配套教学视频)
KafkaController分析8-broker挂掉Kafka源码分析-汇总
在实际应用中broker可能因为机器,硬件,网络,进程自身等原因挂掉; 本章我们来看下一个broker挂掉后整个kafka集群会发生什么事情。 ---- 挂掉的broker不是集群的Controller 在Kafka集群建立过程分析和KafkaController分析6-Replica状态机我们讲过,KafkaController组件中的ReplicaStateMachine对象在启动时会注册监听BrokerChangeListener事件; 当一个broker挂掉后,其在zk的/brokers/ids下面
扫帚的影子
2018/09/05
7250
kafka实战教程(python操作kafka),kafka配置文件详解
应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?
全栈程序员站长
2022/08/12
2.9K0
kafka实战教程(python操作kafka),kafka配置文件详解
分区副本限流机制三部曲(源码篇)
这一篇我们主要来看看分区副本重分配限流是如何实现的,此源码分析基于kafka2.5版本。在开始之前我们先来回顾一下分区重分配的流程,如图一所示。
石臻臻的杂货铺[同名公众号]
2021/11/19
2910
分区副本限流机制三部曲(源码篇)
Kafka学习笔记之Kafka High Availability(下)
  本文在上篇文章基础上,更加深入讲解了Kafka的HA机制,主要阐述了HA相关各种场景,如Broker failover,Controller failover,Topic创建/删除,Broker启动,Follower从Leader fetch数据等详细处理过程。同时介绍了Kafka提供的与Replication相关的工具,如重新分配Partition等。
Jetpropelledsnake21
2019/09/30
6160
Kafka学习笔记之Kafka High Availability(下)
推荐阅读
相关推荐
Kafka源码系列之如何删除topic
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验