Replica有7种状态:
ReplicaStateMachine.png
startup
: 启动ReplicaStateMachine
initializeReplicaState
: 初始化每个replica的状态, 如果replica所在的broker是live状态,则此replica的状态为OnlineReplica
for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
val topic = topicPartition.topic
val partition = topicPartition.partition
assignedReplicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
controllerContext.liveBrokerIds.contains(replicaId) match {
case true => replicaState.put(partitionAndReplica, OnlineReplica)
case false =>
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}
Online
状态的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
, 并且发送LeaderAndIsrRequest
到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
case OnlineReplica =>
assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
replicaState(partitionAndReplica) match {
case NewReplica =>
// add this replica to the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if(!currentAssignedReplicas.contains(replicaId))
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
case _ =>
// check if the leader for this partition ever existed
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
replicaState.put(partitionAndReplica, OnlineReplica)
case None => // that means the partition was never in OnlinePartition state, this means the broker never
}
}
replicaState.put(partitionAndReplica, OnlineReplica)
private def registerBrokerChangeListener() = {
zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
}
BrokerChangeListener()
:
针对broker的上下线,分别回调controller.onBrokerStartup
或controller.onBrokerFailure
val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
if(newBrokerIds.size > 0)
controller.onBrokerStartup(newBrokerIds.toSeq)
if(deadBrokerIds.size > 0)
controller.onBrokerFailure(deadBrokerIds.toSeq)
610532728.jpg