controllerContext.liveBrokers.foreach(addNewBroker(_))
private def addNewBroker(broker: Broker) {
...
val networkClient = {
val selector = new Selector(
...
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time
)
}
...
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}
使用NetworkClient连接到broker node, 使用selector处理网络IO;
RequestSendThread
, 继承自ShutdownableThread
, 需要发送的request会被add到val queue: BlockingQueue[QueueItem]
中, 然后在doWork
中被不断取出val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
, 再通过clientResponse = networkClient.blockingSendAndReceive(clientRequest, socketTimeoutMs)
被发送直到clientResponse
返回val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
if (callback != null) {
callback(response)
}
ControllerChannelManager
的sendRequest
方法来批量发送请求到broker node; val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]