KafkaController
组件中的ReplicaStateMachine
对象在启动时会注册监听BrokerChangeListener
事件;/brokers/ids
下面的节点信息将被自动删除;的
BrokerChangeListener`将触发: val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
if(newBrokerIds.size > 0)
controller.onBrokerStartup(newBrokerIds.toSeq)
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
获取到当前挂掉的broker ids;KafkaControllerContext.liveBrokers
;KafkaController.onBrokerFailure(deadBrokerIds.toSeq)
;KafkaController.onBrokerFailure(deadBrokerIds.toSeq)
val deadBrokersSet = deadBrokers.toSet
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
!deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
partitionStateMachine.triggerOnlinePartitionStateChange()
var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
KafkacontrollerContext.partitionLeadershipInfo
(这里面保存着当前所有topic的各个partition的leader相关信息),筛选出所有leader为当前挂掉的broker的TopicAndPartiton
保存到partitionsWithoutLeader
中;partitionsWithoutLeader
中的partition状态转换成OfflinePartition
;partitionStateMachine.triggerOnlinePartitionStateChange()
对于上面2中OfflinePartition
状态的partition进行重新选主(PartitonStateMachine.electLeaderForPartition
);replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
: 将相应的replica状态转换为OfflineReplica
;controller.removeReplicaFromIsr(topic, partition, replicaId)
, 生成新的LeaderAndIsr Request, 真正broker挂掉这种情况个人感觉这个调用是多余的,因为在上面的3中新的LeaderAndIsr Request已经发送;