kafka管控平台推荐使用 滴滴开源 的 Kafka运维管控平台(戳我呀) 更符合国人的操作习惯 、更强大的管控能力 、更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、更友好的运维生态 、
BliBli视频: 石臻臻的杂货铺
kafka的动态配置
Hello~~ 大家好,我是石臻臻
今天这篇文章,给大家分享一下最近看kafka中的动态配置,不需要重启Broker,即时生效的配置 欢迎留言一起探讨!
kafka中的配置
.properties
文件全局 default
配置优先级从底到高
不想看过程,可以直接看最后的源码总结部分
KafkaServer.startup
1. 动态配置初始化
config.dynamicConfig.initialize(zkClient)
currentConfig
, 然后从zk中获取节点 /config/brokers/
信息,然后更新配置updateDefaultConfig
; (动态默认配置覆盖静态配置)/config/brokers/{当前BrokerId}
获取配置, 如果配置中有ConfigType=PASSWORD
的配置(例如ssl.keystore.password
)存在,接着判断 是否存在password.encoder.old.secret
配置,(这个配置是用来加解密ConfigType=PASSWORD
的旧的秘钥),尝试用旧秘钥解密秘钥; 然后将这些配置重新加密回写入/config/brokers/{当前BrokerId}
; 然后返回配置 (这里主要是动态配置里面有密码类型配置的时候需要做一次解密加密处理)2. 注册可变更配置监听器
如果有对应的配置变更了,那么相应的监听器就会收到通知去修改自己相应的配置;
config.dynamicConfig.addReconfigurables(this)
DynamicBrokerConfig.addReconfigurables
// .........
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
kafkaServer.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
}
3. 动态配置启动监听
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
change-notification-/config/changes
= stateChangeHandler/config/changes
= zNodeChildChangeHandler/config/changes
所有子节点看看有哪些变更lastExecutedChange(启动的时候是-1)
lastExecutedChange
15 * 60 * 1000(15分钟)
就是删除/config/changes /
下面的过期节点TopicConfigHandler.processConfigChanges
data
数据, 如果获取到了则执行通知流程notificationHandler.processNotification(d)
,处理器是ConfigChangedNotificationHandler
; 它先解析节点的json数据,根据版本信息不同调用不同的处理方法; 下面是version=2的处理方式;entityType
和entityName
; 那么久可以去对应的zk数据里面getData获取数据; 并且将获取到的数据Decode成Properties
对象entityConfig
;TopicConfigHandler.processConfigChanges
来进行处理,方法里面再看看流程 ->
entityConfig
里面获取message.format.version
配置消息格式版本号; 如果当前Broker的版本inter.broker.protocol.version
小于message.format.version
配置; 则将message.format.version
配置 排除掉TopicConfigHandler.updateLogConfig
来更新指定Topic的所有TopicPartition的配置,其实是将TP正在加载或初始化的状态标记为没有完成初始化,这将会在后续过程中促成TP重新加载并初始化initializeLeaderEpochCache
; (需要注意的是:这里的动态配置不是支持所有的配置参数,请看【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)的附件部分)leader.replication.throttled.replicas
,follower.replication.throttled.replicas
这两个限流相关;解析配置之后,然后通过quotaManager.markThrottled/quotaManager.removeThrottle
更新/移除对应的限流分区集合unclean.leader.election.enable=true(允许非同步副本选主 )
;那么就会执行TopicUncleanLeaderElectionEnable
方法来让它改变选举策略(前提是当前Broker是Controller角色)BrokerConfigHandler.processConfigChanges
假设我们配置了默认配置; zk里面的节点是
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers
--entity-default
--add-config log.segment.bytes=88888888
/config/changes
里面获取变更节点的json数据.然后去对应的 /config/{entityType}/{entituName}获取对应的数据BrokerReconfigurable
; 这个就是上面启动时候 1.1 启动加载动态配置总流程的第2步骤 (注册可变更配置监听器) 注册的;leader.replication.throttled.rate
、follower.replication.throttled.rate
、replica.alter.log.dirs.io.max.bytes.per.second
都会被更新一下quotaManagers.leader/leader/alterLogDirs.updateQuota
;如果这些配置没有配置的话,则用 Long.MaxValue(相当于是不限流)
来更新--describe
entities
; type是topics
就获取所有topic; type是broker|broker-loggers
则查询所有Broker节点entities
获取配置 ;做些简单校验;然后想Broker发起describeConfigs
请求; 节点策略是LeastLoadedNodeProvider
节点调用方法 KafkaApis.handleDescribeConfigsRequest
1. 未经授权配置不查询
2. 经过授权的配置开始查询 ;
3. 当查询的是`topics`时, 去zk节点`/confgi/类型/类型名` ,获取到动态配置数据之后, 然后将其覆盖本地跟Log相关的静态配置, 完事之后组装一下返回;(1.数据为空过滤2.敏感数据设置value=null; ConfigType=PASSWORD和不知道类型是啥的都是敏感数据 3. 组装所有的同义配置(`静态默认配置、本地静态、默认动态配置、指定动态配置、等等多个配置`)) 返回的数据类型如下:
broker|broker-loggers
节点, 则在 获取到数据之后 然后指定nodeId节点发起 describeBrokerConfigs
请求1. 如果查询的是`brokers`
1. 如果查询的是 `broker-loggers`
1. 发起请求
--describe
流程是一样的delete-config
配置, 需要校验一下当前配置有没有;如果没有抛出异常;incrementalAlterConfigs
;如果请求类型是 brokers/broker-loggers
则发起请求的接收方是 指定的Broker 节点; 否则就是LeastLoadedNodeProvider
(当前负载最少的节点)2. incrementalAlterConfigs 增量修改配置
KafkaApis.handleIncrementalAlterConfigsRequest
configs
;alter.config.policy.class.name=
配置(默认null)的话,则会实例化指定的类(需要继承 AlterConfigPolicy
类);并调用他的 validate
方法来校验;/config/topics/{topicName}
中;/config/changes/config_change_序列号
中; 这个节点主要是让Broker们来监听这个节点的来了解到哪个配置有变更的;省略
在 1. Broker启动加载动态配置 中我们了解到有对节点/config/change
注册一个子节点变更的监听处理器
那么对动态配置做出修改之后, 这个节点就会新增一条数据,那么所有的Broker都会收到这个通知;
所以我们就要来看一看收到通知之后又做了哪些事情
这个流程是又回到了上面的 1. 2 加载Topics/Brokers动态配置 的流程中了;
原理部分讲解比较详细的可以看 : Kafka动态配置实现原理解析 - 李志涛 - 博客园
/config/change
节点的变化不可以,因为Broker是监听
/config/changes/
里面的Broker节点,来实时得知有数据变更;
/config/
下面的配置?没有必要,这样监听的数据数据太多了,而且 你不知道具体是改了哪个配置,所以每次都要全部更新一遍,无缘无故的加重负担了, 用
/config/change
节点来得知哪个类型的数据变更, 只变更这个相关数据就可以了