registerSessionExpirationListener
, 当session到期且新session建立后,进行controller的重新选主;def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
}
controllerElector.startup
. 如果当前broker成功选为Controller, 则onControllerFailover
回调被触发. readControllerEpochFromZookeeper()
incrementControllerEpoch(zkUtils.zkClient)
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
KafkaController.png