专栏首页后端技术kafka 网络模型1 请求响应流程

kafka 网络模型1 请求响应流程

参考

回顾

kafka 启动1 入口函数中,我们阅读了KafkaServer的注释,这里直接总结一下:

  • KafkaServer有两种请求层, data层或control层
    • data层处理来自客户端和集群中其它broker的请求, control层处理来自controller的请求.
    • 两种层都有Reactor线程(负责新连接)、Processor线程(各自维护一个Selector,并从Socket读取请求)、Handler线程(处理请求、生成响应返回给Processor线程做写操作)
    • Reactor、Processor、Handler的比例在data层中是1:N:M, 在control层中是1:1:1

Processor的创建 Processor负责data-plane和control-plane的创建和启动,各自又包含了Acceptor、Processor和RequestChannel。详情阅读kafka请求全流程(一)—— 客户端请求的"SocketServer 定义"一章

请求流程

转载下kafka请求全流程(二)—— 请求的接收以及分发的图:

转载自https://blog.csdn.net/fenglei0415/article/details/106172921/

该图仅供参考,因为它有几个问题:1. 图中ResponseChannel可能笔误,应该为RequestChannel[1] 2. 并不是Processor将响应放到ResponseQueue,而是Processor检查响应队列并写出。 3. 图中没有讲述Selector的作用

另外,文中提到了PRODUCE请求, 此处可拓展阅读Kafka-处理请求(生产(PRODUCE)请求、获取(FETCH)请求)

上图流程描述了Kafka的网络模型,到KafkaApis就结束了。它之后的流程可以参考Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)中的图:

KafkaApis处理

过渡

上文提到了SocketServer、Acceptor、Processor、RequestChannel这几个网络通信组件,还给出了流程图。读过源码的同学应该很快能反应过来。如果没有,可以看下几章。 另外Processor有关网络IO的操作都是交给org.apache.kafka.common.network.Selector完成,而后者又会引出其它Kafka的IO组件,比如KafkaChannel、NetworkReceive、MemoryPool、TransportLayer等。

如果对每个组件硬读,可能难以消化。接下来请跟随我沿着某几个流程、切面进行分析,在这个过程中增加对每个组件的理解。

.1 处理新连接

我们先沿着"处理新连接"这个流程进行研究,看看一个新连接是如何被处理的。 新连接的处理涉及如下几个组件,Acceptor、Processor、org.apache.kafka.common.network.Selector、java.nio.channels.Selector

新连接处理

我们按图中步骤,结合源码逐个解说

① 接受新连接

Acceptor线程在主循环中,监听和接受新连接

接受新连接

交给Processor

Acceptor将SocketChannel交给Processor的队列变量newConnections

Processor::accept

② 注册SocketChannel Processor线程在主循环中,调用configureNewConnections(),把SocketChannel从队列取出并注册到Kafka Selector上。Kafka在Java nio的Selector类上封装了一层,也叫Selector[2]

Processor::run()

注册到KafkaSelector上

③ 为SocketChannel注册事件 看下Kafka Selector的是如何注册SocketChannel的,看下registerChannel的实现

org.apache.kafka.common.network.Kafka.Selector

由下方两幅图可知,Kafka Selector在内部维护了一个java nio Selector。注册java SocketChannel,就是为其在java nio Selector上注册事件。

java nio注册

成员变量java nio Selector

.2 Kafka Selector执行IO

Processor利用Kafka Selector执行网络IO,因此我们要讲解下两者之间的交互。

Kafka Selector对java nio Selector进行了封装,将TCP的流式I/O转化为一个个对象[3]的I/O。TCP的I/O是面向流的,在读取时不能保证刚好完整读取了一个对象,但经过Kafka Selector的封装,外界可以将对象的I/O交给它,而不再需要关心一个对象完整I/O的逻辑。 Kafka Selector内部维护了一个java nio Selector,其核心函数是poll(),每次执行都会进行网络I/O;它还维护了一些"List",每次执行poll,这些变量都会有所更新。 Processor通过调用Selector的poll()方法,再取出它更新的"List",从而完成与外界的通信。

Processor线程循环下有不少函数,我们聚焦网络I/O,只研究图中的这三个函数

①poll() 调用了Kafka Selector的poll方法,该方法会执行网络I/O

Selector执行poll()后,很多"List"会更新,比如compeltedReceives和completedSends[4],分别代表在这轮poll()中"完整接收到的请求"和"完整写出的响应"。 ②processCompletedReceives() 该方法迭代了Selector的completedReceives变量,对每条完整收到的请求进行处理。 这个变量的作用在于,在每次调用poll()后,会完整接收到一些的NetworkReceive。通过迭代该变量,可以处理每一条请求。

处理请求

③processCompletedSends() 该方法迭代了Selector的completedSends变量,对每条完整写出的响应进行处理。 这个变量的作用在于,代码在将要写出的Send交给Selector后,其写出就交给Selector完成了,如果希望在Send写出完成后执行一些逻辑,就可以利用此变量。通过每次执行poll()后迭代该变量,可以为每个完整写出的Send执行剩余的逻辑。

处理完成写出的响应

用图片可以形象地表示这个流程。 ① Processor对Kafka Selector调用poll(),执行网络I/O。

  • Kafka Selector会读取每个触发了读事件的Socket,并将数据放到NetworkReceive中。如果有请求接收完成,就加入到completedReceives变量。
  • Selector将外界事先设置好的Send进行写出,如果写出完成,就加入到completedSend变量。

主流程

在这一章,我们只是笼统地介绍了Processor是如何与Selector交互的,但没有讲清楚Selector的poll()是如何运作的,那些变量是如何被更新的。我们先在此打住,去关注读取到请求后的处理

.3 请求的读取、处理与响应的写出

完整的请求被读取、处理后,生成响应并写出的过程如下:

请求的处理流程

我们跟随源码来印证这个过程 ① 取出请求,交给队列 我们从processCompletedReceives继续。在调用poll()后,从selector.completedReceives中取出每个请求并处理。

处理请求

在该方法中,会生成请求,并通过sendRequest把请求交给RequestChannel

交给队列

查看RequestChannel的实现可知,请求被放入了队列

sendRequest

② 取出请求,执行请求,生成响应 我们看KafkaRequestHandler的线程主循环,可知它从RequestChannel中取出请求,并交给KafkaApis执行。下方图二也显示了apis的类型是KafkaApis。

从队列取出请求

执行请求

从RequestChannel的实现可知,请求是从队列中取出的。

取出请求

从KafkaApis的实现可以看出,它根据请求的类型有不同的处理,此处我们不必研究具体的行为。

KafkaApis

不同的命令有不同的行为,是否发出响应/发出什么响应都是不同的。我们以PRODUCE命令为例,看看响应是如何生成的。

PRODUCE

在该方法中定义了一个子方法sendResponseCallback,其内调用了sendResponse。sendResponse负责发回响应。在此响应被生成。

响应的生成

之后我们看sendResponse的实现,看看响应是如何被送回Processor的。

③ 将响应放入队列

我们看下sendResponse的实现,代码取出了对应的Processor并将响应入队

看下RequestChannel的实现

// RequestChannel.scala
/** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response): Unit = {
    if (isTraceEnabled) {
      ...
    }

    val processor = processors.get(response.processor)
    // The processor may be null if it was shutdown. In this case, the connections
    // are closed, so the response is dropped.
    if (processor != null) {
      processor.enqueueResponse(response)
    }
  }

从Processor的实现看出,响应被放入了队列

④ 取出响应,交给Selector写出 在Processor中,dequeueResponse方法会将响应出队

那么该方法在哪里调用呢?正是Processor主循环调用的其中一个方法,processNewResponses

迭代处理响应

所以Processor线程在主循环中会从responseQueue取出每个响应,并进行处理。

processNewResponses将响应取出后,调用sendResponse,交给Selector将响应发回客户端。

总结

本文讲述了请求从Selector被读取、执行,生成响应,并交由Selector写回客户端的过程。但本文略过了Selector的实现细节,下一篇文章会分析。

  1. 因为SocketServer::dataPlaneRequestChannel变量是RequestChannel类型,所以笔者可能写错了
  2. org.apache.kafka.common.network.Kafka
  3. NetworkReceive和Send
  4. List带引号,是因为有的变量,比如completedReceives,并非List类型,而是LinkedHashMap,但是外界对它的调用是迭代的方式,因此作用就和List一样。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • docker 单机配置redis主从集群 集群 前置准备

    该文是docker redis主从配置 正式部署的前言。如下会简要列出目录,需要了解的前置知识,以防配置时出错。 如果你还不够了解,就阅读正文的相应章节。

    平凡的学生族
  • springboot整合elasticsearch 5.x 6.x 索引配置analyzer

    跟随elasticsearch和ik的安装和function score query 权重分查询教程,发现权重分查询结果与预期不符,判断原因是:在查询中,ik没...

    平凡的学生族
  • mvcc【上】 学习

    mvcc在select、insert、delete、update下都有与系统版本号有关的行为,从而使得读操作不用加锁,且只会读到符合标准的行;但是会增加额外的存...

    平凡的学生族
  • Docker常用命令

    什么是Docker? Docker就是一个容器,但是这个容器里什么都没有,所以我们根据需求不同就要不同的环境,这些环境就是镜像,我们可以用一个镜像生成多个容器,...

    剑行者
  • Web Service初探

    Web Service初探 简介 ​ 简单地说WebService就是一种Web服务,他是一种跨编程语言和操作系统的远程调用技术。WebService的传输依赖...

    SecondWorld
  • Docker 常用命令

    wsuo
  • 【趣学程序】Docker之Docker的常用命令

    同一仓库源可以有多个 TAG,代表这个仓库源的不同个版本,我们使用 REPOSITORY:TAG 来定义不同的镜像。如果你不指定一个镜像的版本标签,例如你只使用...

    趣学程序-shaofeer
  • iOS 底层拾遗:autorelease 优化

    由于 ARC 下 retain/release/autorelease 的调用都是编译器代劳,所以需要使用编译后的代码进行分析,通常笔者选择 Xcode 自带的...

    波儿菜
  • TCP细节分析

    jeremyxu
  • 2020涨薪15K?搞懂这份大厂Java面试知识点笔记汇总,你也没问题

    疫情信息仍在不断刷屏,开工日期一再延迟,相信不少朋友都会担心今年春招是否受影响。其实很多企业,比如腾讯、字节跳动,为了保证春招的顺利进行,尽可能的提高招聘效率,...

    用户5546570

扫码关注云+社区

领取腾讯云代金券