前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaController分析8-broker挂掉Kafka源码分析-汇总

KafkaController分析8-broker挂掉Kafka源码分析-汇总

作者头像
扫帚的影子
发布2018-09-05 17:08:27
6610
发布2018-09-05 17:08:27
举报
  • 在实际应用中broker可能因为机器,硬件,网络,进程自身等原因挂掉;
  • 本章我们来看下一个broker挂掉后整个kafka集群会发生什么事情。

挂掉的broker不是集群的Controller
  • Kafka集群建立过程分析KafkaController分析6-Replica状态机我们讲过,KafkaController组件中的ReplicaStateMachine对象在启动时会注册监听BrokerChangeListener事件;
  • 当一个broker挂掉后,其在zk的/brokers/ids下面的节点信息将被自动删除;
  • ReplicaStateMachineBrokerChangeListener`将触发:
代码语言:javascript
复制
   val curBrokerIds = currentBrokerList.map(_.toInt).toSet
   val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
   val newBrokerInfo = newBrokerIds.map(zkUtils.getBrokerInfo(_))
   val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
   val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
   controllerContext.liveBrokers = curBrokerIds.map(zkUtils.getBrokerInfo(_)).filter(_.isDefined).map(_.get)
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
              deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
   if(newBrokerIds.size > 0)
          controller.onBrokerStartup(newBrokerIds.toSeq)
   if(deadBrokerIds.size > 0)
          controller.onBrokerFailure(deadBrokerIds.toSeq)
  1. 从zk返回了当前的broker列表信息;
  2. val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds获取到当前挂掉的broker ids;
  3. 更新KafkaControllerContext.liveBrokers;
  4. 回调KafkaController.onBrokerFailure(deadBrokerIds.toSeq);
  5. Broker挂掉的逻辑处理:KafkaController.onBrokerFailure(deadBrokerIds.toSeq)
代码语言:javascript
复制
    val deadBrokersSet = deadBrokers.toSet
    val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
      deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
    !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
    partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
    partitionStateMachine.triggerOnlinePartitionStateChange()
    var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
    val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
  1. 先处理KafkacontrollerContext.partitionLeadershipInfo(这里面保存着当前所有topic的各个partition的leader相关信息),筛选出所有leader为当前挂掉的broker的TopicAndPartiton保存到partitionsWithoutLeader中;
  2. partitionsWithoutLeader中的partition状态转换成OfflinePartition;
  3. 通过partitionStateMachine.triggerOnlinePartitionStateChange()对于上面2中OfflinePartition状态的partition进行重新选主(PartitonStateMachine.electLeaderForPartition);
  4. 产生新的LeaderAndIsr Request发送到topic相关的replicas上;
  5. 产生新的UpdateMetadata Request发送到各broker上;
  6. replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica): 将相应的replica状态转换为OfflineReplica;
  7. 在上面6中的状态转换时会调用controller.removeReplicaFromIsr(topic, partition, replicaId), 生成新的LeaderAndIsr Request, 真正broker挂掉这种情况个人感觉这个调用是多余的,因为在上面的3中新的LeaderAndIsr Request已经发送;

Kafka源码分析-汇总

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.02.06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka源码分析-汇总
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档