KafkaController分析3-ControllerChannelManager

  • KafkaController的作用前面我们已经简单介绍过, 基于此KafkaController需要与其他的broker node通信,发送请求;
  • ControllerChannelManager用来管理与其他所有的broker node的网络连接和请求发送等;

ControllerChannelManager

  • 所在文件: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  • 创建到各个broker node的连接, 每个连接对应一个新的线程
controllerContext.liveBrokers.foreach(addNewBroker(_))
  • 创建到单个broker node的连接
private def addNewBroker(broker: Broker) {
    ...
    val networkClient = {
      val selector = new Selector(
        ...
      )
      new NetworkClient(
        selector,
        new ManualMetadataUpdater(Seq(brokerNode).asJava),
        config.brokerId.toString,
        1,
        0,
        Selectable.USE_DEFAULT_BUFFER_SIZE,
        Selectable.USE_DEFAULT_BUFFER_SIZE,
        config.requestTimeoutMs,
        time
      )
    }
...
    val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
      brokerNode, config, time, threadName)
    requestThread.setDaemon(false)
    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
  }

使用NetworkClient连接到broker node, 使用selector处理网络IO;

  • 发送线程RequestSendThread, 继承自ShutdownableThread, 需要发送的request会被add到val queue: BlockingQueue[QueueItem]中, 然后在doWork中被不断取出val QueueItem(apiKey, apiVersion, request, callback) = queue.take(), 再通过clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)被发送直到clientResponse返回
  • 主要处理下面三种类型的请求:
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
            case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
            case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
            case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
            case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
          }
  • 如果设置了回调, 则
if (callback != null) {
            callback(response)
}

ControllerBrokerRequestBatch

  • 所在文件: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  • 使用ControllerChannelManagersendRequest方法来批量发送请求到broker node;
  • 主要处理以下三种请求:
 val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
  val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
  val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏liulun

ASP.NET Core教程【三】实体字段属性、链接标签、并发数据异常、文件上传及读取

前文索引: ASP.NET Core教程【二】从保存数据看Razor Page的特有属性与服务端验证 ASP.NET Core教程【一】关于Razor Page...

32760
来自专栏Java3y

JDBC【数据库连接池、DbUtils框架、分页】

1.数据库连接池 什么是数据库连接池 简单来说:数据库连接池就是提供连接的。。。 为什么我们要使用数据库连接池 数据库的连接的建立和关闭是非常消耗资源的 频繁地...

39740
来自专栏五毛程序员

java实现文件夹(包括其中的子文件夹、子文件)的复制——递归

43560
来自专栏MelonTeam专栏

What's New in LLVM 9

导语 :这绝不仅仅是一篇 WWDC 2017 Session 411 学习笔记。除了有关 LLVM 9.0 的新特性之外,还有关于静态分析器和 Clang 5 ...

364100
来自专栏SDNLAB

OVS中Action源码分析&自定义Action

前言 在生产或是科研中,OpenFlow定义的Action有时候并不能完全满足需求,那么如何向OVS中添加一个自定义的action,本文对此做详细分析。 我们知...

55290
来自专栏公众号_薛勤的博客

Jsoup模拟登录带验证码的教务系统(原理详解)

在模拟登陆该教务系统时,笔者观察到该教务系统还有一个不需要验证码即可登陆的网址:http://jwxt.qlu.edu.cn/jsxsd/xsxk/xklc_l...

19620
来自专栏小白鼠

Sentinel使用原理sentinel-dashboardDubbo适配

有关于sentinel的使用方法和工作原理,在官方文档中都有详细的介绍,并且源码中也已经给出了一系列的demo,以下是示例:

95630
来自专栏Java 技术分享

JavaWeb 之文件的上传下载

68650
来自专栏比原链

剥开比原看代码13:比原是如何通过/list-balances显示帐户余额的?

Gitee地址:https://gitee.com/BytomBlockchain/bytom

9210
来自专栏猿天地

用aop加redis实现通用接口缓存

系统在高并发场景下,最有用的三个方法是缓存,限流,降级。 缓存就是其中之一,目前缓存基本上是用redis或者memcached。 redis和memcached...

37970

扫码关注云+社区

领取腾讯云代金券