前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaController分析3-ControllerChannelManager

KafkaController分析3-ControllerChannelManager

作者头像
扫帚的影子
发布2018-09-05 17:10:27
2670
发布2018-09-05 17:10:27
举报
  • KafkaController的作用前面我们已经简单介绍过, 基于此KafkaController需要与其他的broker node通信,发送请求;
  • ControllerChannelManager用来管理与其他所有的broker node的网络连接和请求发送等;

ControllerChannelManager

  • 所在文件: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  • 创建到各个broker node的连接, 每个连接对应一个新的线程
代码语言:javascript
复制
controllerContext.liveBrokers.foreach(addNewBroker(_))
  • 创建到单个broker node的连接
代码语言:javascript
复制
private def addNewBroker(broker: Broker) {
    ...
    val networkClient = {
      val selector = new Selector(
        ...
      )
      new NetworkClient(
        selector,
        new ManualMetadataUpdater(Seq(brokerNode).asJava),
        config.brokerId.toString,
        1,
        0,
        Selectable.USE_DEFAULT_BUFFER_SIZE,
        Selectable.USE_DEFAULT_BUFFER_SIZE,
        config.requestTimeoutMs,
        time
      )
    }
...
    val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
      brokerNode, config, time, threadName)
    requestThread.setDaemon(false)
    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
  }

使用NetworkClient连接到broker node, 使用selector处理网络IO;

  • 发送线程RequestSendThread, 继承自ShutdownableThread, 需要发送的request会被add到val queue: BlockingQueue[QueueItem]中, 然后在doWork中被不断取出val QueueItem(apiKey, apiVersion, request, callback) = queue.take(), 再通过clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)被发送直到clientResponse返回
  • 主要处理下面三种类型的请求:
代码语言:javascript
复制
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
            case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
            case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
            case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
            case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
          }
  • 如果设置了回调, 则
代码语言:javascript
复制
if (callback != null) {
            callback(response)
}

ControllerBrokerRequestBatch

  • 所在文件: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
  • 使用ControllerChannelManagersendRequest方法来批量发送请求到broker node;
  • 主要处理以下三种请求:
代码语言:javascript
复制
 val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
  val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
  val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
Kafka源码分析-汇总
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.01.16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ControllerChannelManager
  • ControllerBrokerRequestBatch
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档