KafkaController
, 因为此时只有一台broker B1, 它将被选为当前kafka集群的Controller, 过程可参考KafkaController分析1-选主和Failover;a. 更新zk上的controller epoch信息; b. 注册zk上的broker/topic节点变化事件通知; c. 初始化ControllerContext, 主要是从zk上获取broker, topic, parition, isr, partition leader, replicas等信息; d. 启动ReplicaStateMachine; e. 启动PartitionStateMachine; f. 发送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers; g. 如果允许自动leader rebalance的话, 则启动AutoRebalanceScheduler;
KafkaHealthcheck
启动,在zk的/brokers
下面注册自己的信息,类似下面这样:
{"jmx_port":-1,"timestamp":"1477624160337","endpoints":["PLAINTEXT://10.123.81.11:9092"],"host":"10.123.81.11","version":3,"port":9092}
KafkaController
中的ReplicaStateMachine
已经启动且注册了BrokerChangeListener
事件通知, 因为当KafkaHealthcheck
启动结束后,BrokerChangeListener
被触发:def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
if(newBrokerIds.size > 0)
controller.onBrokerStartup(newBrokerIds.toSeq)
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}
}
}
}
}
}
干三件事:
ControllerContext.liveBrokers
;KafkaController.onBrokerStartup(newBrokerIds.toSeq);
KafkaController.onBrokerFailure(deadBrokerIds.toSeq)
;KafkaController.onBrokerStartup
: 针对新增的brokers作处理, 由于现在只有一个broker并且也没有任何的topic, 因此这里基本上是什么都不会作;auto.create.topics.enable=true
), 则发送消息到kafka时,若topic不存在,会自动创建;bin/kafka-topic.sh
脚本来创建topic时, topic的config会被写入zk的/config/topics/topic下, topic的parition分配信息会被写入zk的/brokers/topics/topic下.
其中parition的分配信息用户可以指定,也可由kafka-topic.sh脚本自动产生,产生规则如下:
如查未指定开始位置,就随机选择一位置开始,通过轮询方式分配每个分区的第一个replica的位置, 然后每个partition剩余的replicas的位置紧跟着其第一个replica的位置.
假设一个集群有5个broker, 有个topic有10个parition, 每个parition有3个复本,则分配如下图:1553745402.jpg
KafkaController
中的PartitionStateMachine
组件在启动时注册了TopicChangeListener
(监控听/brokers/topics), 此时被触发: val currentChildren = {
import JavaConversions._
(children: Buffer[String]).toSet
}
val newTopics = currentChildren -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
if(newTopics.size > 0)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
干三件事
ControllerContext.allTopics
;ControllerContext.partitionReplicaAssignment
;KafkaController.onNewTopicCreation
;KafkaController.onNewTopicCreation
: 处理新topic的创建 partitionStateMachine.registerPartitionChangeListener(topic)
onNewPartitionCreation(newPartitions)
onNewPartitionCreation
的处理逻辑:
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
PartitionChangeListener
, 监控其partition数量的改变;partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
: 将partition状态变为NewPartition
;replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
:将replica状态变为NewReplica
, 由于目前partition并没有进行选主操作,因此无其他操作被触发;partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
:
4.1 将partition状态由NewPartition
-> OnlinePartition
;
4.2 选取replicas列表的head作为leader, 将leader, isr信息写入zk的/brokers/topics/[topic]/partitions/[partition id]/state
4.3 BrokerRequestBatch.addLeaderAndIsrRequestForBrokers
: 构造 LeaderAndIsr Request,发送到各live broker, 这个request由broker内部的ReplicaManager
组件处理,我们后面会有专门的章节来分析它;
4.4 replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
: 将replica状态由NewReplica
-> OnlineReplica
;KafkaController
组件依然启动, 选主时发现已有controller存在则不继续进行选主,但仍监听LeaderChangeListener
事件;KafkaHealthcheck
启动,在zk的/brokers
下面注册自己的信息;BrokerChangeListener
被触发, 获取新增的broker列表, 回调KafkaController.onBrokerStartup(newBrokerIds.toSeq);
sendUpdateMetadataRequest
: 发送所有topic的partitionStateInfos到各broker;replicaStateMachine.handleStateChanges
更新replica状态到OnlineReplica
:val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
partitionStateMachine.triggerOnlinePartitionStateChange()
:针对new and offline partitions进行选主;kafka-topics.sh
脚本的alter命令, 在zk的/brokers/topics/topic下更新新增的partition的replicas信息;KafkaController
中的PartitionStateMachine
组件监听的AddPartitionsListener
事件被触发: val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
!controllerContext.partitionReplicaAssignment.contains(p._1))
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
error("Skipping adding partitions %s for topic %s since it is currently being deleted"
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
if (partitionsToBeAdded.size > 0) {
info("New partitions to be added %s".format(partitionsToBeAdded))
controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
}
}
干三件事
ControllerContext.partitionReplicaAssignment
;KafkaController.onNewPartitionCreation
;def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}