前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 网络模型1 请求响应流程

kafka 网络模型1 请求响应流程

作者头像
平凡的学生族
发布2020-06-02 15:22:30
1.1K0
发布2020-06-02 15:22:30
举报
文章被收录于专栏:后端技术后端技术

参考

回顾

kafka 启动1 入口函数中,我们阅读了KafkaServer的注释,这里直接总结一下:

  • KafkaServer有两种请求层, data层或control层
    • data层处理来自客户端和集群中其它broker的请求, control层处理来自controller的请求.
    • 两种层都有Reactor线程(负责新连接)、Processor线程(各自维护一个Selector,并从Socket读取请求)、Handler线程(处理请求、生成响应返回给Processor线程做写操作)
    • Reactor、Processor、Handler的比例在data层中是1:N:M, 在control层中是1:1:1

Processor的创建

Processor负责data-plane和control-plane的创建和启动,各自又包含了Acceptor、Processor和RequestChannel。详情阅读kafka请求全流程(一)—— 客户端请求的"SocketServer 定义"一章

请求流程

转载下kafka请求全流程(二)—— 请求的接收以及分发的图:

转载自https://blog.csdn.net/fenglei0415/article/details/106172921/

该图仅供参考,因为它有几个问题:1. 图中ResponseChannel可能笔误,应该为RequestChannel[1] 2. 并不是Processor将响应放到ResponseQueue,而是Processor检查响应队列并写出。 3. 图中没有讲述Selector的作用

另外,文中提到了PRODUCE请求, 此处可拓展阅读Kafka-处理请求(生产(PRODUCE)请求、获取(FETCH)请求)

上图流程描述了Kafka的网络模型,到KafkaApis就结束了。它之后的流程可以参考Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)中的图:

KafkaApis处理

过渡

上文提到了SocketServer、Acceptor、Processor、RequestChannel这几个网络通信组件,还给出了流程图。读过源码的同学应该很快能反应过来。如果没有,可以看下几章。

另外Processor有关网络IO的操作都是交给org.apache.kafka.common.network.Selector完成,而后者又会引出其它Kafka的IO组件,比如KafkaChannel、NetworkReceive、MemoryPool、TransportLayer等。

如果对每个组件硬读,可能难以消化。接下来请跟随我沿着某几个流程、切面进行分析,在这个过程中增加对每个组件的理解。

.1 处理新连接

我们先沿着"处理新连接"这个流程进行研究,看看一个新连接是如何被处理的。

新连接的处理涉及如下几个组件,Acceptor、Processor、org.apache.kafka.common.network.Selector、java.nio.channels.Selector

新连接处理

我们按图中步骤,结合源码逐个解说

① 接受新连接

Acceptor线程在主循环中,监听和接受新连接

接受新连接

交给Processor

Acceptor将SocketChannel交给Processor的队列变量newConnections

Processor::accept

② 注册SocketChannel

Processor线程在主循环中,调用configureNewConnections(),把SocketChannel从队列取出并注册到Kafka Selector上。Kafka在Java nio的Selector类上封装了一层,也叫Selector[2]

Processor::run()

注册到KafkaSelector上

③ 为SocketChannel注册事件

看下Kafka Selector的是如何注册SocketChannel的,看下registerChannel的实现

org.apache.kafka.common.network.Kafka.Selector

由下方两幅图可知,Kafka Selector在内部维护了一个java nio Selector。注册java SocketChannel,就是为其在java nio Selector上注册事件。

java nio注册

成员变量java nio Selector

.2 Kafka Selector执行IO

Processor利用Kafka Selector执行网络IO,因此我们要讲解下两者之间的交互。

Kafka Selector对java nio Selector进行了封装,将TCP的流式I/O转化为一个个对象[3]的I/O。TCP的I/O是面向流的,在读取时不能保证刚好完整读取了一个对象,但经过Kafka Selector的封装,外界可以将对象的I/O交给它,而不再需要关心一个对象完整I/O的逻辑。 Kafka Selector内部维护了一个java nio Selector,其核心函数是**poll(),每次执行都会进行网络I/O;它还维护了一些"List"**,每次执行poll,这些变量都会有所更新。 Processor通过调用Selector的poll()方法,再取出它更新的"List",从而完成与外界的通信。

Processor线程循环下有不少函数,我们聚焦网络I/O,只研究图中的这三个函数

①poll()

调用了Kafka Selector的poll方法,该方法会执行网络I/O

Selector执行poll()后,很多"List"会更新,比如compeltedReceives和completedSends[4],分别代表在这轮poll()中"完整接收到的请求"和"完整写出的响应"。

②processCompletedReceives()

该方法迭代了Selector的completedReceives变量,对每条完整收到的请求进行处理。

这个变量的作用在于,在每次调用poll()后,会完整接收到一些的NetworkReceive。通过迭代该变量,可以处理每一条请求。

处理请求

③processCompletedSends()

该方法迭代了Selector的completedSends变量,对每条完整写出的响应进行处理。

这个变量的作用在于,代码在将要写出的Send交给Selector后,其写出就交给Selector完成了,如果希望在Send写出完成后执行一些逻辑,就可以利用此变量。通过每次执行poll()后迭代该变量,可以为每个完整写出的Send执行剩余的逻辑。

处理完成写出的响应

用图片可以形象地表示这个流程。

① Processor对Kafka Selector调用poll(),执行网络I/O。

  • Kafka Selector会读取每个触发了读事件的Socket,并将数据放到NetworkReceive中。如果有请求接收完成,就加入到completedReceives变量。
  • Selector将外界事先设置好的Send进行写出,如果写出完成,就加入到completedSend变量。

主流程

在这一章,我们只是笼统地介绍了Processor是如何与Selector交互的,但没有讲清楚Selector的poll()是如何运作的,那些变量是如何被更新的。我们先在此打住,去关注读取到请求后的处理

.3 请求的读取、处理与响应的写出

完整的请求被读取、处理后,生成响应并写出的过程如下:

请求的处理流程

我们跟随源码来印证这个过程

① 取出请求,交给队列

我们从processCompletedReceives继续。在调用poll()后,从selector.completedReceives中取出每个请求并处理。

处理请求

在该方法中,会生成请求,并通过sendRequest把请求交给RequestChannel

交给队列

查看RequestChannel的实现可知,请求被放入了队列

sendRequest

② 取出请求,执行请求,生成响应

我们看KafkaRequestHandler的线程主循环,可知它从RequestChannel中取出请求,并交给KafkaApis执行。下方图二也显示了apis的类型是KafkaApis。

从队列取出请求

执行请求

从RequestChannel的实现可知,请求是从队列中取出的。

取出请求

从KafkaApis的实现可以看出,它根据请求的类型有不同的处理,此处我们不必研究具体的行为。

KafkaApis

不同的命令有不同的行为,是否发出响应/发出什么响应都是不同的。我们以PRODUCE命令为例,看看响应是如何生成的。

PRODUCE

在该方法中定义了一个子方法sendResponseCallback,其内调用了sendResponse。sendResponse负责发回响应。在此响应被生成。

响应的生成

之后我们看sendResponse的实现,看看响应是如何被送回Processor的。

③ 将响应放入队列

我们看下sendResponse的实现,代码取出了对应的Processor并将响应入队

看下RequestChannel的实现

代码语言:javascript
复制
// RequestChannel.scala
/** Send a response back to the socket server to be sent over the network */
  def sendResponse(response: RequestChannel.Response): Unit = {
    if (isTraceEnabled) {
      ...
    }

    val processor = processors.get(response.processor)
    // The processor may be null if it was shutdown. In this case, the connections
    // are closed, so the response is dropped.
    if (processor != null) {
      processor.enqueueResponse(response)
    }
  }

从Processor的实现看出,响应被放入了队列

④ 取出响应,交给Selector写出

在Processor中,dequeueResponse方法会将响应出队

那么该方法在哪里调用呢?正是Processor主循环调用的其中一个方法,processNewResponses

迭代处理响应

所以Processor线程在主循环中会从responseQueue取出每个响应,并进行处理。

processNewResponses将响应取出后,调用sendResponse,交给Selector将响应发回客户端。

总结

本文讲述了请求从Selector被读取、执行,生成响应,并交由Selector写回客户端的过程。但本文略过了Selector的实现细节,下一篇文章会分析。

  1. 因为SocketServer::dataPlaneRequestChannel变量是RequestChannel类型,所以笔者可能写错了
  2. org.apache.kafka.common.network.Kafka
  3. NetworkReceive和Send
  4. List带引号,是因为有的变量,比如completedReceives,并非List类型,而是LinkedHashMap,但是外界对它的调用是迭代的方式,因此作用就和List一样。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考
  • 回顾
  • 请求流程
  • 过渡
  • .1 处理新连接
  • .2 Kafka Selector执行IO
  • .3 请求的读取、处理与响应的写出
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档