Netty 源码解析 ——— writeAndFlush流程分析

本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。

源码解析

本文主要对Netty的写数据流程进行分析。代码调用仅一句:

ctx.writeAndFlush("from server : " + UUID.randomUUID());

变量 ctx 指的是 ChannelHandlerContext对象,我们跟进ChannelHandlerContext的writeAndFlush方法:

public ChannelFuture writeAndFlush(Object msg) {
    return write    AndFlush(msg, newPromise());
}

因为写是异步操作,所以如果我们没有自定义一个ChannelPromise的话,就会构建一个默认的ChannelPromise(即,DefaultChannelPromise)来表示该异步操作。我们可以通过往ChannelPromise中注册listener来得到该异步操作的结果(成功 or 失败),listener会在异步操作完成后得到回调。

往下跟,我们会到?流程

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

这里会完成两个重要的步骤: ① invokeWrite0(msg, promise);将消息放入输出缓冲区中(ChannelOutboundBuffer) ② invokeFlush0(); 将输出缓冲区中的数据通过socket发送到网络中

下面我们来详细展开这两步骤

invokeWrite0(msg, promise)
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

write是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程:

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

① 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。 判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。 ② 『msg = filterOutboundMessage(msg);』过滤待发送的消息:

    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }

过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。 『size = pipeline.estimatorHandle().size(msg);』估计待发送数据的大小:

public int size(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    if (msg instanceof FileRegion) {
        return 0;
    }
    return unknownSize;
}

估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。 ③ 『outboundBuffer.addMessage(msg, size, promise)』将消息加入outboundBuffer中等待发送。

    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

首先对ChannelOutboundBuffer、Entry做个简单介绍

ChannelOutboundBuffer 一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。 ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息) unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。

Entry Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息: a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和; b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners); c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录); e) msg:原始消息对象的引用; f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1) 这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍: 一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即

entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
// Assuming a 64-bit JVM:
//  - 16 bytes object header
//  - 8 reference fields
//  - 2 long fields
//  - 2 int fields
//  - 1 boolean field
//  - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

?假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。

static final class Entry {
    private final Handle<Entry> handle;     // reference field ( 8 bytes)
    Entry next;     // reference field ( 8 bytes)
    Object msg;     // reference field ( 8 bytes)
    ByteBuffer[] bufs;     // reference field ( 8 bytes)
    ByteBuffer buf;     // reference field ( 8 bytes)
    ChannelPromise promise;     // reference field ( 8 bytes)
    long progress;     // long field ( 8 bytes)
    long total;     // long field ( 8 bytes)
    int pendingSize;     // int field ( 4 bytes)
    int count = -1;     // int field ( 4 bytes)
    boolean cancelled;     // boolean field ( 1 bytes)

我们根据上面的理论来计算下Entry对象占用内存的大小: header (16 bytes) + 6 * reference fields(8 bytes)+ 2 * long fields(8 bytes)+ 2 * int fields(4 bytes)+ 1 * boolean field(1 byte)= 89 ——> 加上7bytes的padding = 96 bytes 这就是CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD默认值 96 的由来。(关于JVM中对象的内存大小的详细分析,欢迎参阅JVM中 对象的内存布局 以及 实例分析)

addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。 然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。 totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。

WriteBufferWaterMark WriteBufferWaterMark用于设置写缓存的高水位标志和低水位标志。 如果写缓冲区队列中字节的数量超过了设置的高水位标志,那么Channel#isWritable()方法将开始返回false。然后当写缓冲区中的字节数量减少至小于了低水位标志,Channel#isWritable()方法会重新开始返回true。关于Channel#isWritable()方法目前主要用在ChunkedWriteHandler以及HTTP2的Handler中。因此,如果你想在程序中通过设置WriteBufferWaterMark来控制数据的写出,但你在程序中并没有使用ChunkedWriteHandler或HTTP2,那么这就需要我们自己通过『Channel#isWritable()』来实现是否可用继续写出数据。 比如:

protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    // ......
    if( ctx.channel().isWritable() ) 
    { 
        ctx.writeAndFlush(...) 
    }
}

总的来说,在write的操作最终会将ByteBuf封装为一个Entry对象放到unflushedEntry单向链表的尾部(通过修改tailEntry来实现的),并修改用于记录有该ChannelOutboundBuffer中待发送Entry对象总内存大小的属性totalPendingSize字段。

好了,但目前为止write操作就讲完了。接下来我们来看下flush操作:

invokeFlush0()

flush也是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:

public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

这里的unsafe成员变量依旧是NioSocketChannelUnsafe对象,跟进去:

public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

这里主要完成两个操作: ① outboundBuffer.addFlush(); 添加一个flush到这个ChannelOutboundBuffer,这意味着,将在此之前添加的消息标记为flushed,你将可以处理这些消息。

    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

a) 如我们前面所说,write操作最终会将包含有待发送消息的ByteBuf封装成Entry对象放入unflushedEntry单向链表的尾部。而这里就会先判断unflushedEntry是否为null,如果为null则说明所有的entries已经被flush了,并在此期间没有新的消息被添加进ChannelOutboundBuffer中。所有直接返回就好。 b) 如果unflushedEntry非空,则说明有待发送的entries等待被发送。那么将unflushedEntry赋值给flushedEntry(调用flush()操作时就是将该flushedEntry单向链表中的entries的数据发到网络),并将unflushedEntry置为null,表示没有待发送的entries了。并通过flushed成员属性记录待发送entries的个数。

② flush0();

protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    super.flush0();
}

a) 首先通过isFlushPending()方法来判断flush操作是否需要被挂起:

private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

也就是说,首先会判断当前NioSocketChannel的SelectionKey.OP_WRITE事件是否有被注册到对应的Selector上,如果有,则说明当前写缓冲区已经满了(这里指是socket的写缓冲区满了,并且socket并没有被关闭,那么write操作将返回0。这是如果还有未写出的数据待被发送,那么就会注册SelectionKey.OP_WRITE事件)。等写缓冲区有空间时,SelectionKey.OP_WRITE事件就会被触发,到时NioEventLoop的事件循环就会调用forceFlush()方法来继续将为写出的数据写出,所以这里直接返回就好。 b) 当socket写缓冲区未满,那么就执行flush0()

protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}
  1. 判断Channel的输出缓冲区是否为null或待发送的数据个数为0,如果是则直接返回,因为此时并没有数据需要发送。
  2. 判断当前的NioSocketChannel是否是Inactive状态,如果是,则会标识所有等待写请求为失败(即所有的write操作的promise都会是失败完成),并且如果NioSocketChannel已经关闭了,失败的原因是“FLUSH0_CLOSED_CHANNEL_EXCEPTION”且不会回调注册到promise上的listeners;但如果NioSocketChannel还是open的,则失败的原始是“FLUSH0_NOT_YET_CONNECTED_EXCEPTION”并且会回调注册到promise上的listeners。
  3. 调用doWrite(outboundBuffer);方法将Channel输出缓冲区中的数据通过socket传输给对端:

doWrite是一个写循环操作,当满足一定条件时会结束循环。每一次循环会完成的操作:

  1. 判断当前ChannelOutboundBuffer中的数据都已经被传输完了,如果已经传输完了,并且发现NioSocketChannel还注册有SelectionKey.OP_WRITE事件,则将SelectionKey.OP_WRITE从感兴趣的事件中移除,即,Selector不在监听该NioSocketChannel的可写事件了。然后跳出循环,方法返回。
  2. 初始化writtenBytes = 0、done = false、setOpWrite = false三个属性,它们分别表示本次循环已经写出的字节数、本次循环是否写出了所有待写出的数据、是否需要设置SelectionKey.OP_WRITE事件的标志为。
  3. 『ByteBuffer[] nioBuffers = in.nioBuffers()』 获取所有待写出的ByteBuffer,它会将ChannelOutboundBuffer中所有待写出的ByteBuf转换成JDK Bytebuffer(因为,底层依旧是基于JDK NIO的网络传输,所有最终传输的还是JDK 的ByteBuffer对象)。它依次出去每个待写的ByteBuf,然后根据ByteBuf的信息构建一个ByteBuffer(这里的ByteBuf是一个堆外ByteBuf,因此构建出来的ByteBuffer也是一个堆外的ByteBuffer),并设置该ByteBuffer的readerIndex、readableBytes的值为ByteBuf对应的值。然后返回构建好的ByteBuffer[]数组。
  4. 获取本次循环需要写出的ByteBuffer个数
  5. 获取本次循环总共需要写出的数据的字节总数
  6. 根据nioBufferCnt值的不同执行不同的传输流程: [1] nioBufferCnt == 0 :对非ByteBuffer对象的数据进行普通的写操作。 上面我们说了in.nioBuffers()会将ChannelOutboundBuffer中所有待发送的ByteBuf转换成Bytebuffer后返回一个ByteBuffer[]数组,以便后面进行ByteBuffer的传输,而nioBufferCnt则表示待发送ByteBuffer的个数,即ByteBuffer[]数组的长度。注意,这里nioBuffers()仅仅是对ByteBuf对象进行了操作,但是我们从前面的流程可以得知,除了ByteBuf外FileRegion对象也是可以进行底层的网络传输的。因此当待传输的对象是FileRegion时“nioBufferCnt == 0”,那么这是就会调用『AbstractNioByteChannel#doWrite(ChannelOutboundBuffer in)』方法来完成数据的传输。实际上底层就是依靠JDK NIO 的 FileChannel来实现零拷贝的数据传输。 [2] nioBufferCnt == 1 :说明只有一个ByteBuffer等待被传输,那么不使用gather的write操作来传输数据(JDK NIO 支持一次写单个ByteBuffer 以及 一次写多个ByteBuffer的聚集写模式) [3] nioBufferCnt > 1 :说明有多个ByteBuffer等待被传输,那么使用JDK NIO的聚集写操作,一次性传输多个ByteBuffer到NioSocketChannel中。

[2]、[3] 中写操作的逻辑是一样的:

for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    final long localWrittenBytes = ch.write(...);
    if (localWrittenBytes == 0) {
        setOpWrite = true;
        break;
    }
    expectedWrittenBytes -= localWrittenBytes;
    writtenBytes += localWrittenBytes;
    if (expectedWrittenBytes == 0) {
        done = true;
        break;
    }
}

config().getWriteSpinCount()为16,也就是一次写操作会最多执行16次的SocketChannel.write操作来将数据写到网络中。每次ch.write完都会进行相应的『expectedWrittenBytes -= localWrittenBytes;』操作,将expectedWrittenBytes期待被写的字节数减去已经写出的字节数。如果在最后expectedWrittenBytes依旧大于0,则说明在这16次的socket写操作后依旧还有未写完的数据等待被继续写,那么done就会为false;否则若所有的数据都写完了,done会被置为true。注意,ch.write操作会返回本次写操作写出的字节数,但该方法返回0时,即localWrittenBytes为0,则说明底层的写缓冲区已经满了(这里应该指的是linux底层的写缓冲区满了),这是就会将setOpWrite置为true,此时因为数据还没写完done还是false。那么这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。

  1. 『in.removeBytes(writtenBytes)』:释放所有已经写出去的缓存对象,并修改部分写缓冲的索引。
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;

        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}

通过已经写出数据的字节数来清理或修改ByteBuf。也就是说writtenBytes的大小可能是包含了多个ByteBuf以及某个ByteBuf的部分数据(因为一个ByteBuf可能只写出了部分数据,还未完成被写出到网络层中)。 a) 『 if (readableBytes <= writtenBytes) 』这个if判断表示:本次socket的write操作(这里是真的是网络通信写操作了)已经写出去的字节数”大于"了当前ByteBuf包可读取的字节数。 这说明,当前这个包中所有的可写的数据都已经写完了,既然当前这个ByteBuf的数据都写完了,那么久可以将其删除了。即,调用『remove()』操作,这个操作就会标识异步write操作为成功完成,并且会回调已经注册到ByteBuf的promise上的所有listeners。同时会原子的修改ChannelOutboundBuffer的totalPendingSize属性值,减少已经写出的数据大小(包括Entry对象内存大小和真实数据的大小),并且如果减少后totalPendingSize小于设置 or 默认的WriteBufferWaterMark的low值,并且再次之前totalPendingSize超过了WriteBufferWaterMark的high值,那么将触发fireChannelWritabilityChanged事件。『remove()』操作还会将当前的ByteBuf指向下一个待处理的ByteBuf,最后释放这个已经被写出去的ByteBuf对象资源。 b) 通过上面的分析,我们知道大数据包走的是else流程。也就是说,本次真实写出去的数据 比 当前这个ByteBuf的可读取数据要小(也就说明,当前这个ByteBuf还没有被完全的写完。因此并不会通过调用『remove()』操作。直到整个大数据包所有的内容都写出去了,那么这是if(readableBytes <= writtenBytes)才会为真执行『remove()』完成相关后续的操作)。那么此时,会根据已经写出的字节数大小修改该ByteBuf的readerIndex索引值。并且,如果该异步写操作的ChannelPromise是ChannelProgressivePromise对象并且注册了相应的progressiveListeners事件,则该listener会得到回调。你可以通过该listener来观察到大数据包写出去的进度。

  1. done表示本次写操作是否完成,。有两种情况下done为false: [1] 还有未写完的数据待发送,并且写缓冲区已经满了(这里指的是linux底层的写缓冲区满了),无法再继续写出,那么此时setOpWrite标识为true。这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。接着退出doWrite()循环写操作。 [2] 执行了config().getWriteSpinCount()次(默认16次)socket写操作后,数据仍旧未写完,那么此时会将flush()操作封装成一个task提交至NioEventLoop的taskQueue中,这样在NioEventLoop的下一次事件循环时会就会取出该任务并执行,也就会继续写出未写完的任务了。这也说明了,如果发送的是很大的数据包的话,可能一次写循环操作是无法将数据全部发送出去的,也不会为了发送该大数据包的数据而导致NioEventLoop线程的阻塞以至于影响NioEventLoop上其他Channel的操作和响应。接着退出doWrite()循环写操作。

好了,到目前为止,Netty整个的写流程就分析完了。本文主要专注于写操作的流程,而并未到Netty的内存模式进行展开。

后记

若文章有任何错误,望大家不吝指教:)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java Web

Java I/O不迷茫,一文为你导航!

学习过计算机相关课程的童鞋应该都知道,I/O 即输入Input/ 输出Output的缩写,最容易让人联想到的就是屏幕这样的输出设备以及键盘鼠标这一类的输入设备,...

592
来自专栏小詹同学

Leetcode打卡 | No.015 三数之和

欢迎和小詹一起定期刷leetcode,每周一和周五更新一题,每一题都吃透,欢迎一题多解,寻找最优解!这个记录帖哪怕只有一个读者,小詹也会坚持刷下去的!

972
来自专栏架构师之旅

【Java SE】Java NIO系列教程(十二)Java NIO与IO

当学习了Java NIO和IO的API后,一个问题马上涌入脑海: 我应该何时使用IO,何时使用NIO呢?在本文中,我会尽量清晰地解析Java NIO和IO的差异...

1905
来自专栏王小雷

HBase数据模型(1)

HBase数据模型(1) HBase数据模型(2) 1.0 HBase的特性 Table HBase以表(Table)的方式组织数据,数据存储在表中。...

2017
来自专栏人人都是极客

环形缓冲区的实现

队列 (Queue):是一种先进先出(First In First Out ,简称 FIFO)的线性表,只允许在一端插入(入队),在另一端进行删除(出队)。

993
来自专栏jeremy的技术点滴

netty3与netty4的区别

3744
来自专栏匠心独运的博客

消息中间件—简谈Kafka中的NIO网络通信模型

文章摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,本文就为大家简要地介绍下Kafka的NIO网络通信模型

1052
来自专栏Spark学习技巧

Flink流式处理概念简介

一,抽象层次 Flink提供不同级别的抽象来开发流/批处理应用程序。 ? 1,stateful streaming 最底层。它通过Process Functio...

2936
来自专栏Create Sun

python 3.x 与2.x的区别

前言   保持学习的态度,学一门动态语言其实是很早以前的就准备要做的事情,当时还在纠结python与ruby。现在不单单是要学python,还在考虑用它做点什么...

3758
来自专栏我杨某人的青春满是悔恨

Swift中的内存管理

之前用Swift写了一个App,已经在App Store上架了。前两天更新了一些功能,然后用Instruments检查的时候,发现有内存泄漏问题。有些同学可能觉...

995

扫码关注云+社区