ReplicaManager源码解析2-LeaderAndIsr 请求响应

  • 消息从客户端产生后,发送到哪个broker;
  • 发送到broker后,broker如何接收,如何存储;

KafkaApis中响应LeaderAndIsr Request

    val correlationId = request.header.correlationId
    val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]

      def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
        // for each new leader or follower, call coordinator to handle consumer group migration.
        // this callback is invoked under the replica state change lock to ensure proper order of
        // leadership changes
        updatedLeaders.foreach { partition =>
          if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
            coordinator.handleGroupImmigration(partition.partitionId)
        }
        updatedFollowers.foreach { partition =>
          if (partition.topic == GroupCoordinator.GroupMetadataTopicName)
            coordinator.handleGroupEmigration(partition.partitionId)
        }
      }

      val responseHeader = new ResponseHeader(correlationId)
      val leaderAndIsrResponse=
        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
          new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
        } else {
          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
        }

      requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, leaderAndIsrResponse)))

其中最主要的操作调用ReplicaManager.becomeLeaderOrFollower来初始化Partition

val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
  • ReplicaManager.becomeLeaderOrFollower
  1. 判断LeaderAndIsr请求中的controllerEpoch和ReplicaManager保存的controllerEpoch(在处理UpdateMetadata Request时更新, 参见Kafka集群Metadata管理), 如果本地存的controllerEpoch大,则忽略当前的LeaderAndIsr请求, 产生BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.StaleControllerEpochCode)
  2. 处理leaderAndISRRequest.partitionStates中的第个partition state; 2.1 创建Partition对象,这个我们后面会讲到;
allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))

2.2 如果partitionStateInfo中的leaderEpoch更新,则存储它在val partitionState = new mutable.HashMap[Partition, LeaderAndIsrRequest.PartitionState]()

if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
          if(stateInfo.replicas.contains(config.brokerId))
                 partitionState.put(partition, stateInfo)
}

2.3 分离出转换成leader和follower的partitions;

 val partitionsTobeLeader = partitionState.filter { case (partition, stateInfo) =>
          stateInfo.leader == config.brokerId
        }
  val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

2.4 处理转换成leader

makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)

实现上干两件事: 停止从leader来同步消息: replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_))),参见ReplicaManager源码解析1-消息同步线程管理 调用Partition的makeLeader方法:partition.makeLeader(controllerId, partitionStateInfo, correlationId)来作leader的转换 2.5 处理转换成follower

makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.correlationId, responseMap, metadataCache)

2.6 启动HighWaterMarkCheckPointThread, 具体后面章节会讲到,

if (!hwThreadInitialized) {
          startHighWaterMarksCheckPointThread()
          hwThreadInitialized = true
}

2.7 回调KafkaApis.handleLeaderAndIsrRequest.onLeadershipChange

  • 针对makeLeadersmakeFollowers的分析我们等分析完Parition, ReplicaFetcherManager后一并分析.
  • LeaderAndIsr 请求响应流程图:

LeaderAndIsr 请求响应.png

Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏恰童鞋骚年

Hadoop学习笔记—6.Hadoop Eclipse插件的使用

开篇:Hadoop是一个强大的并行软件开发框架,它可以让任务在分布式集群上并行处理,从而提高执行效率。但是,它也有一些缺点,如编码、调试Hadoop程序的难度较...

61310
来自专栏向治洪

butternife Zelezny自动注入插件

插件地址:http://plugins.jetbrains.com/plugin/7369 Products: IntelliJ IDEA, RubyMine...

19480
来自专栏比原链

Derek解读Bytom源码-P2P网络 地址簿

Gitee地址:https://gitee.com/BytomBlockchain/bytom

11830
来自专栏pydata

hadoop 2.4.1 上安装spark 1.1.0

进入到http://localhost:port访问Ipython Notebook

12620
来自专栏Android 研究

APK安装流程详解8——PackageManagerService的启动流程(下)

那我们就来看下scanPackageLI(PackageParser.Package, int, int, long, UserHandle)方法

47810
来自专栏技术博文

excel导入与导出

基本上导出的文件分为两种: 1:类Excel格式,这个其实不是传统意义上的Excel文件,只是因为Excel的兼容能力强,能够正确打开而已。修改这种文件后再保存...

32260
来自专栏Hadoop实操

如何使用StreamSets实时采集Kafka并入库Kudu

47250
来自专栏数据之美

Spark 伪分布式 & 全分布式 安装指南

0、前言 3月31日是 Spark 五周年纪念日,从第一个公开发布的版本开始,Spark走过了不平凡的5年:从刚开始的默默无闻,到13年的鹊起,14年的大爆发...

62250
来自专栏比原链

Derek解读Bytom源码-P2P网络 地址簿

Gitee地址:https://gitee.com/BytomBlockchain/bytom

9410
来自专栏Google Dart

Flutter 构建完整应用手册-联网 顶

从大多数应用程序获取互联网上的数据是必要的。 幸运的是,Dart和Flutter为这类工作提供了工具!

14820

扫码关注云+社区

领取腾讯云代金券