前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaController分析6-Replica状态机

KafkaController分析6-Replica状态机

作者头像
扫帚的影子
发布2018-09-05 17:13:09
7020
发布2018-09-05 17:13:09
举报

Replica的状态

Replica有7种状态:

  • NewReplica: 在partition reassignment期间KafkaController创建New replica;
  • OnlineReplica: 当一个replica变为一个parition的assingned replicas时, 其状态变为OnlineReplica, 即一个有效的OnlineReplica. Online状态的parition才能转变为leader或isr中的一员;
  • OfflineReplica: 当一个broker down时, 上面的replica也随之die, 其状态转变为Onffline;
  • ReplicaDeletionStarted: 当一个replica的删除操作开始时,其状态转变为ReplicaDeletionStarted;
  • ReplicaDeletionSuccessful: Replica成功删除后,其状态转变为ReplicaDeletionSuccessful;
  • ReplicaDeletionIneligible: Replica成功失败后,其状态转变为ReplicaDeletionIneligible;
  • NonExistentReplica: Replica成功删除后, 从ReplicaDeletionSuccessful状态转变为NonExistentReplica状态.
  • 状态转换图:

ReplicaStateMachine.png

ReplicaStateMachine

  • 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
  • startup: 启动ReplicaStateMachine
  1. initializeReplicaState: 初始化每个replica的状态, 如果replica所在的broker是live状态,则此replica的状态为OnlineReplica
代码语言:javascript
复制
for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
      val topic = topicPartition.topic
      val partition = topicPartition.partition
      assignedReplicas.foreach { replicaId =>
        val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
        controllerContext.liveBrokerIds.contains(replicaId) match {
          case true => replicaState.put(partitionAndReplica, OnlineReplica)
          case false =>
            replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
        }
      }
    }
  1. 处理可以转换到Online状态的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 并且发送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
代码语言:javascript
复制
case OnlineReplica =>
          assertValidPreviousStates(partitionAndReplica,
            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
          replicaState(partitionAndReplica) match {
            case NewReplica =>
              // add this replica to the assigned replicas list for its partition
              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
              if(!currentAssignedReplicas.contains(replicaId))
                controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
            case _ =>
              // check if the leader for this partition ever existed
              controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                case Some(leaderIsrAndControllerEpoch) =>
                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                    replicaAssignment)
                  replicaState.put(partitionAndReplica, OnlineReplica)
                case None => // that means the partition was never in OnlinePartition state, this means the broker never
              }
          }
          replicaState.put(partitionAndReplica, OnlineReplica)
  • 监听broker改变
代码语言:javascript
复制
 private def registerBrokerChangeListener() = {
    zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
  }
  • 处理borker的改变事件BrokerChangeListener(): 针对broker的上下线,分别回调controller.onBrokerStartupcontroller.onBrokerFailure
代码语言:javascript
复制
             val curBrokerIds = currentBrokerList.map(_.toInt).toSet
              val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
              val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
              val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
              val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
              controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
              newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
              if(newBrokerIds.size > 0)
                controller.onBrokerStartup(newBrokerIds.toSeq)
              if(deadBrokerIds.size > 0)
                controller.onBrokerFailure(deadBrokerIds.toSeq)

补一张图

610532728.jpg

Kafka源码分析-汇总

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.01.22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Replica的状态
  • ReplicaStateMachine
  • 补一张图
  • Kafka源码分析-汇总
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档