在上一篇文章中,我分析了Kafka的请求、响应流程,但留下了Selector的疑点。本文会分析Selector和它的poll()
是如何进行网络IO的,NetworkReceive是如何被完整读取的,Send是如何被完整写出的,还会涉及到KafkaChannel和它的mute机制。
读完上一篇文章,我们应当理解了Kafka Selector在内部维护了一个java nio Selector,变量名叫nioSelector
。
外部向Kafka Selector注册SocketChannel,其实都是注册到了java Selector上。
而Kafka Selector又是通过调用java Selector,来收集触发了I/O事件的Socket,从而对其执行I/O。
图中橙色的圆圈代表触发了I/O事件的Socket
Processor线程在主循环调用Selector::poll,看下它的实现。方法一开始就调用了clear。
结合注释可知,clear的实现是很简单地清空各种成员变量,因为这些变量都是上一次poll()
的结果,在这次poll()
之前就应该被处理过。[1]
Selector::poll的方法很长,我们看到它的方法主体,借助其内部维护的java Selector收集了触发I/O事件的SelectionKey,并调用pollSelectionKeys执行I/O。
收集到了要I/O的SelectionKey后,pollSelectionKeys要怎么做呢?
读者可以先自主思考: 现在收集到了一组触发了I/O事件的SelectionKey,它们可能触发了读事件/写事件,或者两者都触发了。那么我们要对这些SelectionKey逐一判断触发的事件,如果触发了读事件,就尝试把字节流读进NetworkReceive;如果触发了写事件,就尝试把Send的数据写出去。
没错,pollSelectionKeys就是这么做的。对每个SelectionKey,它首先取出它上面附着的KafkaChannel,以便之后要进行IO操作时,对其进行IO。
然后,根据条件判断调用attempRead,并调用attempWrite。
channel.ready()
始终为真,channel.hasBytesBufferd()
始终为假。所以如果要对一个KafkaChannel执行读取,它必须: key.isReadable()
为真)hasCompletedReceive(channel)
为假)。如果有,你应当先处理完这个NetworkReceive,才能再读取。explicityMutedChannels.contains(channel)
为假,这是mute机制的内容,我们之后再研究)channel.ready()
始终为真;第四个判断有点复杂,我们跳过不分析,当它为真。所以要对一个KafkaChannel执行写入,它必须: Selector::attemptWrite
KafkaChannel:::hasSend
在条件满足时,attemptRead会被调用于读取NetworkReceive;在attemptWrite条件满足后,write会被调用于写出Send。我们看下attemptRead和write的实现, 不难发现规律:
channel.read()
尝试读取若干字节channel.maybeCompeleteReceive()
判断是否NetworkReceive读取完成completedReceives
队列channel.write()
尝试写出若干字节channel.maybeCompleteSend()
判断Send是否写出完成completedSends
队列
Selector::attempRead
Selector::write
addToCompletedReceives
此处可见,KafkaChannel最重要的方法是有关IO的read
、maybeCompleteReceive
、write
、maybeCompleteSend
。我们之后再看它们的实现
KafkaChannel是基于java SocketChannel上的一层封装(尽管它是利用java nio attachment机制附着在SocketChannel上的对象)。每个KafkaChannel代表一个客户端,一个KafkaSelector会管理多个KafkaChannel,并对其进行IO操作。
KafkaChannel的附着 上文提到"尽管它是利用java nio attachment机制附着在SocketChannel上的对象",KafkaChannel是如何被附着的呢? 我们回到Processor的源码,在主循环中的其中一个方法configureNewConnections中,将SocketChannel注册到Selector上
Selector会一路调用至buildAndAttachKafkaChannel,在此创建KafkaChannel并附着到SocketChannel上
KafkaChannel的结构 KafkaChannel中最重要的三个成员变量是TransportLayer、NetworkReceive和Send。KafkaChannel通过TransportLayer进行读写,读取NetworkReceive,写出Send。
如下图所示,KafkaChannel通过TransportLayer进行IO,而TransportLayer只是SocketChannel的一层封装:
我们稍占篇幅用来解释"TransportLayer只是SocketChannel的一层封装"这一点。 首先从它的注释可见,该接口就是SocketChannel的一个封装,可以直接当做SocketChannel的替代。
TransportLayer和SocketChannel都继承了GatheringByteChannel和ScatteringByteChannel,因此能对ByteBuffer和ByteBuffer数组进行读写。
SocketChannel
这个类只有两个实现,分别对应PLAINTEXT和SSL模式,两个实现都维护了对SocketChannel的引用。篇幅关系,这里就不看SslTransportLayer了。
总之,对TransportLayer调用的各种IO方法,在底层都是转交给SocketChannel完成的,所以我们可以把它当做SocketChannel一样使用。
在上一章我们说到,Selector对客户端的IO在于attempRead和write,但后者又会对KafkaChannel调用read、maybeCompleteReceive、write、maybeCompleteSend,这些都是KafkaChannel有关IO的重要方法。
我们先看KafkaChannel是如何执行读取的,可知,读取和判断是否完成,与NetworkReceive::readFrom和NetworkReceive::complete有关。
再看KafkaChannel是如何执行写出的
可见KafkaChannel的读写、判断是否完成,与NetworkReceive::readFrom、NetworkReceive::complete、Send::writeTo、Send::completed有关。我们要分析这两个类的读写行为。
NetworkReceive 结构如下,NetworkReceive包含两个ByteBuffer,叫做size和buffer。
在读取时,先读取4字节到size内,再根据size指示的大小为buffer分配内存,然后读满整个buffer时,NetworkReceive就读取完成了。由于缓存的大小清晰,能够避免"tcp粘包"问题
从构造函数中看出,size是固定4字节的
readFrom方法负责读取size和buffer,由于该方法可能被多次调用,每次都需要判断size和buffer的状态,并读取。
complete方法判断是否读取完成,也就是size和buffer是否都读满了
Send Send是一个接口,含有completed和writeTo方法。有三个类/抽象类实现了writeTo方法。注释中强调了该方法可能会被调动多次才写出完成,因此其实现都遵循了这一点。
以ByteBufferSend为例,在构造函数中,计算了remaining,要写出的剩余字节数
writeTo方法负责写出一组ByteBuffer
completed方法会判断remaining是否不大于0(在PLAINTEXT下,pending始终为false)
在读取一个请求后,mute 写出一个响应后,unmute 这样做是为了使得每个请求一来一回,有序排队
poll()
中收到的请求,在这次poll()
调用前就应当被处理过,所以这一次调用就应该清空。换句话说,如果这次不清空,那之后Processor就会重复处理这些请求了。