前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 网络模型2 Selector

kafka 网络模型2 Selector

作者头像
平凡的学生族
发布2020-06-09 11:08:33
8470
发布2020-06-09 11:08:33
举报
文章被收录于专栏:后端技术后端技术

在上一篇文章中,我分析了Kafka的请求、响应流程,但留下了Selector的疑点。本文会分析Selector和它的poll()是如何进行网络IO的,NetworkReceive是如何被完整读取的,Send是如何被完整写出的,还会涉及到KafkaChannel和它的mute机制。

Selector

读完上一篇文章,我们应当理解了Kafka Selector在内部维护了一个java nio Selector,变量名叫nioSelector

外部向Kafka Selector注册SocketChannel,其实都是注册到了java Selector上。

而Kafka Selector又是通过调用java Selector,来收集触发了I/O事件的Socket,从而对其执行I/O。

图中橙色的圆圈代表触发了I/O事件的Socket

poll

Processor线程在主循环调用Selector::poll,看下它的实现。方法一开始就调用了clear。

结合注释可知,clear的实现是很简单地清空各种成员变量,因为这些变量都是上一次poll()的结果,在这次poll()之前就应该被处理过。[1]

Selector::poll的方法很长,我们看到它的方法主体,借助其内部维护的java Selector收集了触发I/O事件的SelectionKey,并调用pollSelectionKeys执行I/O。

pollSelectionKeys

收集到了要I/O的SelectionKey后,pollSelectionKeys要怎么做呢?

读者可以先自主思考: 现在收集到了一组触发了I/O事件的SelectionKey,它们可能触发了读事件/写事件,或者两者都触发了。那么我们要对这些SelectionKey逐一判断触发的事件,如果触发了读事件,就尝试把字节流读进NetworkReceive;如果触发了写事件,就尝试把Send的数据写出去。

没错,pollSelectionKeys就是这么做的。对每个SelectionKey,它首先取出它上面附着的KafkaChannel,以便之后要进行IO操作时,对其进行IO。

然后,根据条件判断调用attempRead,并调用attempWrite。

  • 关于attempRead. 实际上,在PLAINTEXT下,channel.ready()始终为真,channel.hasBytesBufferd()始终为假。所以如果要对一个KafkaChannel执行读取,它必须:
    • 触发了读事件(key.isReadable()为真)
    • 没有已经完整读取的NetworkReceive(hasCompletedReceive(channel)为假)。如果有,你应当先处理完这个NetworkReceive,才能再读取。
    • 而且该KafkaChannel不能被静音(explicityMutedChannels.contains(channel)为假,这是mute机制的内容,我们之后再研究)
  • 关于attempWrite. 实际上在该方法的实现内也有条件判断,有四个判断。而在 PLAINTEXT下,channel.ready()始终为真;第四个判断有点复杂,我们跳过不分析,当它为真。所以要对一个KafkaChannel执行写入,它必须:
    • 触发了写事件(第三个条件)
    • 有Send可供写出(第一个条件)

    Selector::attemptWrite

    KafkaChannel:::hasSend

在条件满足时,attemptRead会被调用于读取NetworkReceive;在attemptWrite条件满足后,write会被调用于写出Send。我们看下attemptRead和write的实现, 不难发现规律:

  • attemptRead:
    1. 先调用channel.read()尝试读取若干字节
    2. 再用channel.maybeCompeleteReceive()判断是否NetworkReceive读取完成
    3. 如果完成,加入到completedReceives队列
  • write
    1. 先调用channel.write()尝试写出若干字节
    2. 再用channel.maybeCompleteSend()判断Send是否写出完成
    3. 如果完成,加入到completedSends队列

    Selector::attempRead

    Selector::write

addToCompletedReceives

此处可见,KafkaChannel最重要的方法是有关IOreadmaybeCompleteReceivewritemaybeCompleteSend。我们之后再看它们的实现

KafkaChannel

KafkaChannel是基于java SocketChannel上的一层封装(尽管它是利用java nio attachment机制附着在SocketChannel上的对象)。每个KafkaChannel代表一个客户端,一个KafkaSelector会管理多个KafkaChannel,并对其进行IO操作。

KafkaChannel的附着 上文提到"尽管它是利用java nio attachment机制附着在SocketChannel上的对象",KafkaChannel是如何被附着的呢? 我们回到Processor的源码,在主循环中的其中一个方法configureNewConnections中,将SocketChannel注册到Selector上

Selector会一路调用至buildAndAttachKafkaChannel,在此创建KafkaChannel并附着到SocketChannel上

KafkaChannel的结构 KafkaChannel中最重要的三个成员变量是TransportLayer、NetworkReceive和Send。KafkaChannel通过TransportLayer进行读写,读取NetworkReceive,写出Send。

  • 每个NetworkReceive代表一个单独的请求,KafkaChannel读取的字节流会收纳到NetworkReceive中,当NetworkReceive读满,一个请求就完整读取了
  • 每个Send代表一个单独的响应,需要写出响应时只需赋值此变量,之后调用write()方法将其中的字节流写出

如下图所示,KafkaChannel通过TransportLayer进行IO,而TransportLayer只是SocketChannel的一层封装:

TransportLayer

我们稍占篇幅用来解释"TransportLayer只是SocketChannel的一层封装"这一点。 首先从它的注释可见,该接口就是SocketChannel的一个封装,可以直接当做SocketChannel的替代。

TransportLayer和SocketChannel都继承了GatheringByteChannel和ScatteringByteChannel,因此能对ByteBuffer和ByteBuffer数组进行读写

SocketChannel

这个类只有两个实现,分别对应PLAINTEXT和SSL模式,两个实现都维护了对SocketChannel的引用。篇幅关系,这里就不看SslTransportLayer了。

总之,对TransportLayer调用的各种IO方法,在底层都是转交给SocketChannel完成的,所以我们可以把它当做SocketChannel一样使用。

IO

在上一章我们说到,Selector对客户端的IO在于attempRead和write,但后者又会对KafkaChannel调用read、maybeCompleteReceive、write、maybeCompleteSend,这些都是KafkaChannel有关IO的重要方法。

我们先看KafkaChannel是如何执行读取的,可知,读取和判断是否完成,与NetworkReceive::readFrom和NetworkReceive::complete有关。

再看KafkaChannel是如何执行写出的

  • 首先调用setSend。设置send、注册写事件,以让write被调用
  • 然后写出操作要调用Send::writeTo
  • 判断是否写出完成与Send::completed有关。写出完成时注销写事件。

可见KafkaChannel的读写、判断是否完成,与NetworkReceive::readFrom、NetworkReceive::complete、Send::writeTo、Send::completed有关。我们要分析这两个类的读写行为。

NetworkReceive和Send

NetworkReceive 结构如下,NetworkReceive包含两个ByteBuffer,叫做size和buffer。

  • size的大小为4字节,存储了buffer的字节数
  • buffer存储了具体的指令内容

在读取时,先读取4字节到size内,再根据size指示的大小为buffer分配内存,然后读满整个buffer时,NetworkReceive就读取完成了。由于缓存的大小清晰,能够避免"tcp粘包"问题

从构造函数中看出,size是固定4字节的

readFrom方法负责读取size和buffer,由于该方法可能被多次调用,每次都需要判断size和buffer的状态,并读取。

complete方法判断是否读取完成,也就是size和buffer是否都读满了

Send Send是一个接口,含有completed和writeTo方法。有三个类/抽象类实现了writeTo方法。注释中强调了该方法可能会被调动多次才写出完成,因此其实现都遵循了这一点。

以ByteBufferSend为例,在构造函数中,计算了remaining,要写出的剩余字节数

writeTo方法负责写出一组ByteBuffer

completed方法会判断remaining是否不大于0(在PLAINTEXT下,pending始终为false)

mute机制

在读取一个请求后,mute 写出一个响应后,unmute 这样做是为了使得每个请求一来一回,有序排队

  1. 比如completedReceives是上一次poll()中收到的请求,在这次poll()调用前就应当被处理过,所以这一次调用就应该清空。换句话说,如果这次不清空,那之后Processor就会重复处理这些请求了。
  2. 读者可自行点击查看这两个方法在PLAINTEXT下的实现,此处就不占用篇幅了。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Selector
    • poll
      • pollSelectionKeys
      • KafkaChannel
        • TransportLayer
          • IO
          • NetworkReceive和Send
          • mute机制
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档