Netty 源码解析 ——— 基于 NIO 网络传输模式的 OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE 事件处理流程

本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。

预备知识

首先,我们知道JDK NIO的Selector实现了I/O多路复用。可以通过一个线程来管理多个Socket。我们可以将多个Channel(一个Channel代表了一个Socket)注册到一个Selector上,并且设置其感兴趣的事件。这样一来,在Selector.select操作时,若发现Channel有我们所感兴趣的事件发生时Selector就会将其记录下来(即,SelectedKeys),然后我们就可以对事件进行相应的处理了。更多关于JDK NIO的知识请参阅关于 NIO 你不得不知道的一些“地雷”

ServerSocketChannel的有效事件为OP_ACCEPT。 SocketChannel的有效事件为OP_CONNECT、OP_READ、OP_WRITE

SelectionKey.OP_ACCEPT 事件处理流程

当服务端收到客户端的一个连接请求时,‘SelectionKey.OP_ACCEPT’将会触发。在NioEventLoop的事件循环中会对该事件进行处理:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

我们来看看unsafe.read()的实现,在NioServerSocketChannel中unsafe是一个NioMessageUnsafe实例:

首先,需要说明的是在处理ACCEPT事件时,虽然整个方法的实现好像是在处理读取数据的操作,但实际上对于ACCEPT事件来说,其中消息的读取指的就是接收一个客户端的请求「serverSocket.accpet()」操作。 再者,这里我们主要针对接收连接的逻辑进行分析,关于allocHandle相关的分析不会进行展开,可以参阅Netty 源码解析 ——— AdaptiveRecvByteBufAllocator,这篇文章已经对AdaptiveRecvByteBufAllocator做了详细的分析了。所以此处,我们主要关注的是真实的接收客户端连接请求所涉及的流程。

① 对于ACCEPT事件,每次读循环执行一次读操作(但并没有读取任何字节数据,totalBytesRead > 0 为false)这也是符合NIO规范的,因为每次ACCEPT事件被触发时,仅表示有一个客户端向服务器端发起了连接请求。 ② doReadMessages(readBuf):这步主要是通过serverSocket.accpet()来接受生成和客户端通信的socket,并将其放入到readBuf集合中。

接收当前客户端的连接请求,以得到一个SocketChannel。 创建一个NioSocketChannel实例,NioSocketChannel表示一个使用了NIO Selector的TCP/IP socket的实现。因此我们在构造NioSocketChannel是将上面accept()返回的SocketChannel作为构造函数的参数传入,也就是说NioSocketChannel中持有SocketChannel的引用。同时,我们也将NioServerSocketChannel作为构造函数的参数传入,作为当前NioSocketChannel的parent[一个Channel可以拥有一个父亲(parent),这取决于这个channel是如何被创建的。比如说,一个SocketChannel,它是被ServerSocketChannel所接受的,SocketChannel将ServerSocketChannel作为parent()方法的结果返回]。 而NioSocketChannel的构建和NioServerSocketChannel的构建是非常类似的,关于NioServerSocketChannel的构造我们已经在Netty 源码解析 ——— 服务端启动流程 (下)中进行了详细分析。这里我们提及重要的几点,NioSocketChannel实例的构造主要完成了: a) 构建Unsafe实例赋值给成员变量unsafe,这里是NioByteUnsafe对象,该类用于完成Channel真实的I/O操作和传输。 b) 创建了该Channel的ChannelPipeline实例(即,DefaultChannelPipeline)赋值给成员变量pipeline。每一个Channel都有它自己的pipeline。 这里ChannelPipeline的构建是很重要的一步。每一个新的Channel被创建时都会分配一个新的ChannelPipeline。ChannelPipeline本质上就是一系列的ChannelHandlers。ChannelPipeline还提供了方法用于传播事件通过ChannelPipeline本身。 ChannelPipeline通过两个AbstractChannelHandlerContext对象本身(head、tail)来维护一个双向链表。 c) SelectionKey.OP_READ赋值给成员变量readInterestOp 设置SocketChannel.configureBlocking(false);配置SocketChannel为非阻塞模式,这步很重要。因为只有非阻塞模式Channel才能使用NIO的Selector来实现非阻塞的I/O操作。 d) 构建NioSocketChannelConfig实例赋值给成员变量config,即,这是负责维护NioSocketChannel相关配置的对象。 ③ pipeline.fireChannelRead(readBuf.get(i)):这里实际上就是将第②步构建的NioSocketChannel作为ChannelRead事件的传播数据进行传播。ChannelRead是一个入站事件,它会从ChannelPipeline中的head开始传播,依次顺序回调ChannelInboundHandler的channelRead方法。 这里我们重点来看看ServerBootstrapAcceptor对channelRead事件的处理,ServerBootstrapAcceptor是服务端启动流程中Netty底层加入到NioServerSocketChannel所关联的ChannelPipeline中的一个入站处理器。

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

a) 将我们在启动流程中设置的childHandler(如,「serverBootstrap.childHandler(new MyServerInitializer())」)以及childOptions(如,「serverBootstrap.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)」)、childAttrs(如,「serverBootstrap.childAttr(AttributeKey.valueOf("userID"), UUID.randomUUID().toString())」)设置到这个NioSocketChannel中。 b) 将配置好的NioSocketChannel注册到childGroup中,并注册了一个监听器,因为注册是一个异步操作,该监听器会得以调用在注册操作完成时(完成包括,成功的完成、失败的完成、取消的完成)。该监听器实现:如果发现该注册操作失败了,则会强制关闭当前这个NioSocketChannel。 而将NioSocketChannel注册到childGroup的操作和将NioServerSocketChannel注册到parentGroup的流程也是极其类似的。详细的说明请参阅Netty 源码解析 ——— 服务端启动流程 (下)。这里做一个简单的概述。 整个注册流程: a) 首先会通过轮询的方式从childGroup中获取一个NioEventLoop,将当前的NioSocketChannel注册到这个NioEventLoop上。 b) 将当前的SocketChannel注册到NioEventLoop中的Selector上,并将NioSocketChannel作为附加属性设置到SelectionKey中。 c) 回调我们自定义的ChannelInitializer的initChannel方法,将我们定义的一个个ChannelHandler添加到当前NioSocketChannel所关联的ChannelPipeline上,然后将ChannelInitializer本身从ChannelPipeline中移除。 d) 标记注册这个异步操作标志为成功完成。 e) 触发ChannelRegistered事件,该事件会在ChannelPipeline中得以传播。 f) 触发ChannelActive事件,该事件也是一个入站事件,它会从ChannelPipeline中的head开始传播,而head的channelActive方法除了将ChannelActive事件传播给下一个ChannelInboundHandler之外,还调用一个readIfIsAutoRead()方法。 而readIfIsAutoRead()最终会调用到「AbstractNioChannel.doBeginRead()」方法:

    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

而这个方法中就完成了将readInterestOp(即,上面在说明NioSocketChannel的构建过程中已经提及,该成员变量值为SelectionKey.OP_READ)设置为感兴趣的事件。这样一来,Selector就会监听该SocketChannel的读事件了。

到目前为止,NioServerSocketChannel通过accept接受了一个客户端的连接请求的整个流程就完成了。这里NioSocketChannel的创建是由NioServerSocketChannel所在的NioEventLoop( 实际上是NioEventLoop所在的线程上 )完成的。然后将创建好的NioSocketChannel注册到childGroup中,也就是通过轮询的方式的方式从childGroup中获取一个NioEventLoop,然后将NioSocketChannel注册的其上。在注册的过程中也完成了将SocketChannel注册到Selector,并设置SelectionKey.OP_READ为感兴趣的事件,这样Selector就会开始监听这个SocketChannel的读事件了。

SelectionKey.OP_CONNECT 事件处理流程

当SelectionKey.OP_CONNECT(连接事件)准备就绪时,我们执行如下操作:

            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

① 将SelectionKey.OP_CONNECT事件从SelectionKey所感兴趣的事件中移除,这样Selector就不会再去监听该连接的SelectionKey.OP_CONNECT事件了。而SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,就可以进行读、写操作了。

② unsafe.finishConnect():

注意,当连接操作既没有被取消也没有超时的情况下,该方法才会被事件循环所调用。

a) boolean wasActive = isActive():

    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

因为此时「SocketChannel.finishConnect()」还没调用,所以「ch.isConnected()」将返回false,因此isActive()的结果为false。 b) doFinishConnect():

    protected void doFinishConnect() throws Exception {
        if (!javaChannel().finishConnect()) {
            throw new Error();
        }
    }

该方法会调用SocketChannel.finishConnect()来标识连接的完成,如果我们不调用该方法,就去调用read/write方法,则会抛出一个NotYetConnectedException异常。 注意,无论如何在connect后finishConnect()方法都是需要被调用的。调用finishConnect()的三种返回:

  1. 如果你在connect()后直接调用了finishConnect()( 并非在CONNECT事件中调用 ),则若finishConnect()返回了true,则表示channel连接已经建立,而且CONNECT事件也不会被触发了。
  2. 如果finishConnect()方法返回false,则表示连接还未建立好。那么就可以通过CONNECT事件来监听连接的完成。
  3. 如果finishConnect()方法抛出了一个IOException异常,则表示连接操作失败。 c) fulfillConnectPromise(connectPromise, wasActive):

I. 当异步的“连接尝试”操作通过取消来关闭了,那么则直接返回。因为当“连接尝试”操作被取消时,connectPromise会被置为null。

II. isActive():获取当前SocketChannel的状态,因为此时「SocketChannel.finishConnect()」已经被调用过了,因此该方法会返回true。 III. boolean promiseSet = promise.trySuccess(): 将当前的异步的“连接尝试”操作尝试标记为成功。如果用户取消了“连接尝试(即,调用connect操作后,用户调用了cancel来取消该操作)”的操作的话,该方法将返回false;否则返回true,并且如果有ChannelFutureListener注册到了这个ChannelFuture(即,ChannelPromise)上,那么监听器的operationComplete方法将得以回调。 IV. 无论当前的“连接尝试”操作是否被取消,channelActive()事件都将被触发,因为此时channel确实已经处于active状态了。 channelActive是一个入站事件,该事件会从ChannelPipeline的head开始传播至tail间的所有ChannelInboundHandler,并回调它们的channelActive方法。 我们来深入看看head对channelActive事件的处理:

它除了将channelActive事件传播给下一个ChannelInboundHandler外,还会进行一个「readIfIsAutoRead()」操作:

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }

因为NioSocketChannel的NioSocketChannelConfig对象的autoRead属性默认就为1,因此isAutoRead()为true。那么就会调用channel.read()操作,这将触发一个read事件在ChannelPipeline中,而read是一个出站操作。它会从ChannelPipeline的尾部开始传播至head间的每个ChannelOutboundHandler。 我们接着来看head对read事件的处理:

        // DefaultChannelPipeline#HeadContext#read()
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

        // AbstractChannel#AbstractUnsafe#beginRead()
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

        // AbstractNioChannel#doBeginRead()
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }

            readPending = true;

            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }

这里的readInterestOp是SelectionKey.OP_READ,是在构建NioSocketChannel对象时传进来的。因此,我们可以知道,此时的read事件主要是完成了,在Selector中对已经注册到其上的NioSocketChannel的OP_READ标识为感兴趣的事件。这样Selector就监控该SocketChannel的读操作了。 V. 如果用户执行了取消“连接尝试”的操作,那么就关闭channel,并触发channelInactive事件。 d) 如果成员变量connectTimeoutFuture非空,则说明该“连接尝试”操作设置了一个连接超时时间。那么,此时连接已经完成了,我们就可以取消这个连接超时检测的定时任务了。超时任务会记录本次“连接尝试”操作为失败状态,并且会将connectTimeoutFuture成员变量置为null。 比如,可能存在这样一种情况:也就是当程序执行完fulfillConnectPromise方法中的「promise.trySuccess()」之后,以及在执行finally代码块之前,“连接尝试”的已经完成,并且ChannelPromise已标记为了true。但是此时设置的连接超时时间已到并且连接超时任务被得以执行,此时超时任务发现ChannelPromise的状态已经被标识过了也就不会进行关闭channel的操作,而直接将connectTimeoutFuture成员变量置为null。那么在此之后,外面的程序运行到了finally代码块中。所以,我们需要对connectTimeoutFuture成员变量进行非空判断,只有在connectTimeoutFuture非空的情形下,才需要代用cancel来取消定时任务,因为如上流程我们可以知道,如果connectTimeoutFuture的定时任务已经执行过了的话,它会将connectTimeoutFuture置为null。

总的来说,OP_CONNECT事件的触发时,表示当前的socket处于了可连接的状态了,需要调用SocketChannel.finishConnect()来完成连接的后续事件。同时会触发ChannelActive事件,该事件为一个入站事件,它会在NioSocketChannel所关联的ChannelPipeline管道得以传播,即,回调head到tail之间所有的ChannelInboundHandler的channelActive方法。而head的channelActive方法中又会触发一个channel的read操作,该操作最终会在NioSocketChannel所注册的Selector中标识OP_READ为感兴趣的事件,这使得Selector会监听NioSocketChannel上是否有可读的数据准备好被读取了。

SelectionKey.OP_READ 事件处理流程

当有可读数据准备被读取时,‘SelectionKey.OP_READ’将会触发。在NioEventLoop的事件循环中会对该事件进行处理:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

我们来看看unsafe.read()的实现,在NioSocketChannel中unsafe是一个NioByteUnsafe实例:

这里,我们主要针对读数据涉及的逻辑进行分析,关于allocHandle如何计算出一个最优缓冲区的大小用于创建缓冲区,以及判断读循环是否应该继续,已经在Netty 源码解析 ——— AdaptiveRecvByteBufAllocator做了详细的分析了。所以此处,我们主要关注的是真实的读数据操作所涉及的流程。 ① 首先,在循环前将分配器处理器中累加的消息/字节计数重置。接着,进行一个读循环操作: I. 根据给定的分配器以及预测的最优缓冲区容量大小创建一个缓冲区用于准备接受可读取的数据。在Netty中,分配器(allocator)默认为PooledByteBufAllocator实例,而PooledByteBufAllocator实现通过使用jemalloc算法来实现高效的内存分配。 II. allocHandle.lastBytesRead(doReadBytes(byteBuf)): 使用已经分配好的ByteBuf来读取数据。并在分配器处理器中记录本次读取的字节数。

这里会将SocketChannel中的内容写到byteBuf中,从byteBuf的writerIndex开始写入数据,writerIndex会增加所写入的字节数。并且设置了最大可从SocketChannel读取的数据大小为“allocHandle.attemptedBytesRead()”,这也是byteBuf的容量大小。 III. 判断本次读操作所读取到的字节数: a) 若‘读取到的字节数 < 0’,即为’-1’时,说明对端已经关闭了。close变量会被标记为true。因为没有读取到数据,因此调用‘byteBuf.release()’来释放bytebuf,然后退出读循环操作(break)。需要说明的是,因为byteBuf是通过PooledByteBufAllocator来分配的缓冲区,是一个池中的ByteBuf,因此是要通过release()方法来减小bytebud的引用计数,当bytebuf的引用计数为0时,则说明此时已经没有引用指向这个bytebuf了,那么它就会被“回收”; b) 若‘读取到的字节数 == 0’,仅仅说明本次读操作没有读取到数据,那么就会执行同上面一样的释放bytebuf操作,即,‘byteBuf.release()’,然后退出读循环操作(break); c) 若‘读取到的字节数 > 0’,说明本次读操作已经到达有效的数据了。那么执行:    [1]. 「allocHandle.incMessagesRead(1)」对读消息的次数进行累加。    [2]. 然后标识readPending为false,表示本次读操作已经读取到了有效数据,无需等待再一次的读操作。    [3]. 接着触发ChannelRead事件,它会在ChannelPipeline中传播,「pipeline.fireChannelRead(byteBuf)」。这是一个入站事件,它会从ChannelPipeline的head开始传播,依次顺序回调ChannelInboundHandler的channelRead()方法。这里可以看到,是读循环中每一次有效的读操作都对触发一次ChannelRead事件,并不是在所有数据都读取到之后才触发一次ChannelRead事件。因此,我们需要提供一系列的编解码器来将收到的数据分割成我们一个个的逻辑数据包,对此Netty也提供了一系列拆箱即有的编解码器为我们解决相关的问题。 IV. 根据当前的NioSocketChannel是否是自动读取的配置,以及已经读取的数据字节数,以及已经进行的读操作次数,以及最近一次读取的字节数来判断是否需要继续进行读循环操作。若需要则继续读循环操作;否则退出读循环,继续后面的流程。 ② allocHandle.readComplete():在本次读循环结束后调用一次「allocHandle.readComplete()」来记录本次读循环的数据信息以用于预测下一次读事件触发时,应该分配多大的ByteBuf容量更加合理些。 ③ pipeline.fireChannelReadComplete():触发ChannelReadComplete事件,用于表示当前读操作的最后一个消息已经被ChannelRead所消费。ChannelReadComplete是一个入站消息,它会从ChannelPipeline的head开始传播,依次顺序回调ChannelInboundHandler的channelReadComplete方法。 ChannelRead vs ChannelReadComplete 当Channel检测到对端有数据可以读取的时候,channelRead方法会被调用。 channelRead方法可能会被调用多次,当channelReadComplete方法被回调的时候,标识着数据已经都读取完了。也就是说,channelRead方法会被调用多次,当所有消息都读取完后channelReadComplete方法会得到一次调用。 ④ 如果close被标识为了true,则说明对端已经关闭了连接。(即,读操作中读取的字节数量为-1,则表示远端已经关闭了),则执行「closeOnRead(pipeline)」

首先需要补充一点,在NIO传输模式下,当SocketChannel的read操作返回’-1’时,有两种情况:a) 对端已经关闭了连接,即SocketChannel被关闭了;b) 当前端执行了「socketChannel.shutdownInput()」。 I. 若“isInputShutdown0()”返回false,则说明是远端连接已经关闭了。那么此时,如果我们的程序配置了“ChannelOption.ALLOW_HALF_CLOSURE”属性(即,可以在启动引导类时通过option(ChannelOption.ALLOW_HALF_CLOSURE, true)来启用配置),那么就会进行shutdownInput()操作,并触发一个用户自定义的ChannelInputShutdownEvent.INSTANCE事件,在ChannelPipeline中传播。该事件是一个入站事件,它会从ChannelPipeline中的head开始传播,异常顺序调用ChannelInboundHandler的userEventTriggered方法。 II. 若“isInputShutdown0()”返回false,则说明是远端连接已经关闭了。并且此时我们的程序并没有配置启动“ChannelOption.ALLOW_HALF_CLOSURE”。那么此时就会进行相应SocketChannel的关闭等相关操作。 III. 若“isInputShutdown0()”返回true,则说明是当前的NioSocketChannel自动调用了shutdownInput()方法来关闭了输入流。那么此时就会触发一个用户自定义的ChannelInputShutdownEvent.INSTANCE事件,在ChannelPipeline中传播。该事件是一个入站事件,它会从ChannelPipeline中的head开始传播,异常顺序调用ChannelINboundHandler的userEventTriggered方法。 关于「SocketChannel.shutdownInput()」:关闭一个连接的读,但不关闭这个通道。一旦关闭了读,那么在这之后调用channel的read都将返回’-1’,来表示’流的结尾’。如果往已经关闭的输入流中发送数据,都会默认被丢弃。 ⑤ 如果本次读操作已经读取到有效数据(即,最近一次读操作返回的读取字节数>0),并且当前的NioSocketChannel的配置为非自动读取(disable autoRead,说明此时用户不希望Selector去监听当前SocketChannel的读事件,用户可以根据业务逻辑的需要,在希望读取数据时再去添加OP_READ事件到Selector中。并且在每次读取到数据后就将OP_READ事件从所感兴趣的事件中移除),那么此时需要将OP_READ事件从所感兴趣的事件中移除,这样Selector就不会继续监听该SocketChannel的读事件了。 removeReadOp()操作就是将OP_READ从SelectionKey的interestOps集合中移除:

在NioSocketChannel中成员变量readInterestOp就是SelectionKey.OP_READ。

PS:注意,如果在当前端主动调用「channel.shutdownInput()」方法时,需要在处理’ChannelInputShutdownReadComplete’这个用户自定义的事件时调用「channel().config().setAutoRead(false);」来将autoRead置为false。不然,OP_READ事件会一直被触发,而上的步骤’III’会一直被调用,这会导致一些问题,比如不必要的CPU消耗。 调用方式类似:

public class MyServerHandler extends SimpleChannelInboundHandler<String> {
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        Channel serverChannel = ctx.channel();
        if(serverChannel instanceof NioSocketChannel) {
            System.out.println("server shutdownInput...");
            ((NioSocketChannel) serverChannel).shutdownInput();
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof ChannelInputShutdownReadComplete) {
            System.out.println("detail ChannelInputShutdownEvent event......");
            ctx.channel().config().setAutoRead(false);
        }
        super.userEventTriggered(ctx, evt);
    }
}

SelectionKey.OP_WRITE 事件处理流程

在NioEventLoop的事件循环中’SelectionKey.OP_WRITE’事件的处理流程如下:

    // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
     if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
    }

关于OP_WRITE事件: OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理现场就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。 SocketChannel会在写数据使,若发现当buffer还有数据,但写缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册OP_WRITE事件,这样当写缓冲区又有空闲空间的时候就会触发OP_WRITE事件,这样我们就可以继续将没写完的数据继续写出了。而且在写完后,一定要记得将OP_WRITE事件注销。 比如,来看看NioSocketChannel的doWrite()操作(「 ch.unsafe().forceFlush()」方法最终也就是会调用到这里):

就像上面所说的那样,当发现socketChannel.write(buffer)返回的已经写出去的字节数为0时,则说明此时写缓冲区已经满了无法写入,因此就需要注册一个OP_WRITE事件,这样当写缓存有空间来继续接受数据时,该写事件就会被触发,这样我们就可以继续将没写完的数据继续写出了。而且在写完后,一定要记得将OP_WRITE事件注销。 关于写操作的具体流程分析请参见Netty 源码解析 ——— writeAndFlush流程分析

后记

本文主要对NioEventLoop中涉及到的四种NIO事件的处理流程进行了分析。四个看似简单的处理流程,深入探索后发现其实并不简单,其实可以展开的点还有很多,特别是关于写事件涉及到ChannelOutboundBuffer以及Netty默认使用的PooledByteBufAllocator实现了jemalloc算法来完成高效的内存分配等等,希望在后面的文章中能继续和大家分享我的分析以及想法。 若文章有任何错误,望大家不吝指教:)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏平凡文摘

Spring AOP 的实现原理

962
来自专栏前端杂货铺

node中子进程同步输出

管道 通过“child_process”模块fork出来的子进程都是返回一个ChildProcess对象实例,ChildProcess类比较特殊无法手动创建该对...

2636
来自专栏栗霖积跬步之旅

2.2synchronized同步语句块

使用synchronized虽然能够避免不同步的现象出现,但是也会出现弊端,比如代码执行时间过长,那么其他线程就必须等待该线程执行完毕释放锁之后才能拿到锁。 面...

21810
来自专栏cloudskyme

设计模式(8)-状态模式(关注状态之间的变化)

状态模式(State Pattern)是设计模式的一种,属于行为模式。 定义(源于Design Pattern):当一个对象的内在状态改变时允许改变其行为,这...

3357
来自专栏我叫刘半仙

原荐Spring AOP是什么?你都拿它做什么?

         对于最近博主最近写博客的兴致大发,我也在思考:为什么而写博客?在互联网时代,无论你是牛人大咖,还是小白菜鸟,都有发表自己看法的权利。无论你是对...

2904
来自专栏软件工程师成长笔记

用MINA实现UDP通信的例子

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象...

622
来自专栏一枝花算不算浪漫

[Spring框架]Spring AOP基础入门总结二:Spring基于AspectJ的AOP的开发.

3088
来自专栏木木玲

Netty 源码解析 ——— 服务端启动流程 (下)

1656
来自专栏xingoo, 一个梦想做发明家的程序员

Java直接内存与非直接内存性能测试

什么是直接内存与非直接内存 根据官方文档的描述: A byte buffer is either direct or non-direct. Given a d...

2145
来自专栏纯洁的微笑

Guava 源码分析(Cache 原理【二阶段】)

在上文「Guava 源码分析(Cache 原理)」中分析了 Guava Cache 的相关原理。

1151

扫码关注云+社区