前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty 源码解析 ——— writeAndFlush流程分析

Netty 源码解析 ——— writeAndFlush流程分析

作者头像
tomas家的小拨浪鼓
发布2018-06-27 14:29:01
2.3K0
发布2018-06-27 14:29:01
举报
文章被收录于专栏:木木玲木木玲

本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。

源码解析

本文主要对Netty的写数据流程进行分析。代码调用仅一句:

代码语言:javascript
复制
ctx.writeAndFlush("from server : " + UUID.randomUUID());

变量 ctx 指的是 ChannelHandlerContext对象,我们跟进ChannelHandlerContext的writeAndFlush方法:

代码语言:javascript
复制
public ChannelFuture writeAndFlush(Object msg) {
    return write    AndFlush(msg, newPromise());
}

因为写是异步操作,所以如果我们没有自定义一个ChannelPromise的话,就会构建一个默认的ChannelPromise(即,DefaultChannelPromise)来表示该异步操作。我们可以通过往ChannelPromise中注册listener来得到该异步操作的结果(成功 or 失败),listener会在异步操作完成后得到回调。

往下跟,我们会到?流程

代码语言:javascript
复制
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

这里会完成两个重要的步骤:

① invokeWrite0(msg, promise);将消息放入输出缓冲区中(ChannelOutboundBuffer)

② invokeFlush0(); 将输出缓冲区中的数据通过socket发送到网络中

下面我们来详细展开这两步骤

invokeWrite0(msg, promise)
代码语言:javascript
复制
private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

write是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:

代码语言:javascript
复制
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}

unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程:

代码语言:javascript
复制
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

① 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。

判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。

② 『msg = filterOutboundMessage(msg);』过滤待发送的消息:

代码语言:javascript
复制
    protected final Object filterOutboundMessage(Object msg) {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.isDirect()) {
                return msg;
            }

            return newDirectBuffer(buf);
        }

        if (msg instanceof FileRegion) {
            return msg;
        }

        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }

过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。

『size = pipeline.estimatorHandle().size(msg);』估计待发送数据的大小:

代码语言:javascript
复制
public int size(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    if (msg instanceof FileRegion) {
        return 0;
    }
    return unknownSize;
}

估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。

③ 『outboundBuffer.addMessage(msg, size, promise)』将消息加入outboundBuffer中等待发送。

代码语言:javascript
复制
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

首先对ChannelOutboundBuffer、Entry做个简单介绍

ChannelOutboundBuffer

一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。

ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息)

unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。

Entry

Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息:

a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和;

b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners);

c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录);

e) msg:原始消息对象的引用;

f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1)

这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍:

一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即

代码语言:javascript
复制
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
// Assuming a 64-bit JVM:
//  - 16 bytes object header
//  - 8 reference fields
//  - 2 long fields
//  - 2 int fields
//  - 1 boolean field
//  - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
        SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);

?假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。

代码语言:javascript
复制
static final class Entry {
    private final Handle<Entry> handle;     // reference field ( 8 bytes)
    Entry next;     // reference field ( 8 bytes)
    Object msg;     // reference field ( 8 bytes)
    ByteBuffer[] bufs;     // reference field ( 8 bytes)
    ByteBuffer buf;     // reference field ( 8 bytes)
    ChannelPromise promise;     // reference field ( 8 bytes)
    long progress;     // long field ( 8 bytes)
    long total;     // long field ( 8 bytes)
    int pendingSize;     // int field ( 4 bytes)
    int count = -1;     // int field ( 4 bytes)
    boolean cancelled;     // boolean field ( 1 bytes)

我们根据上面的理论来计算下Entry对象占用内存的大小:

header (16 bytes) + 6 * reference fields(8 bytes)+ 2 * long fields(8 bytes)+ 2 * int fields(4 bytes)+ 1 * boolean field(1 byte)= 89 ——> 加上7bytes的padding = 96 bytes

这就是CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD默认值 96 的由来。(关于JVM中对象的内存大小的详细分析,欢迎参阅JVM中 对象的内存布局 以及 实例分析)

addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。

然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。

totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。

WriteBufferWaterMark

WriteBufferWaterMark用于设置写缓存的高水位标志和低水位标志。

如果写缓冲区队列中字节的数量超过了设置的高水位标志,那么Channel#isWritable()方法将开始返回false。然后当写缓冲区中的字节数量减少至小于了低水位标志,Channel#isWritable()方法会重新开始返回true。关于Channel#isWritable()方法目前主要用在ChunkedWriteHandler以及HTTP2的Handler中。因此,如果你想在程序中通过设置WriteBufferWaterMark来控制数据的写出,但你在程序中并没有使用ChunkedWriteHandler或HTTP2,那么这就需要我们自己通过『Channel#isWritable()』来实现是否可用继续写出数据。 比如:

代码语言:javascript
复制
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    // ......
    if( ctx.channel().isWritable() ) 
    { 
        ctx.writeAndFlush(...) 
    }
}

总的来说,在write的操作最终会将ByteBuf封装为一个Entry对象放到unflushedEntry单向链表的尾部(通过修改tailEntry来实现的),并修改用于记录有该ChannelOutboundBuffer中待发送Entry对象总内存大小的属性totalPendingSize字段。

好了,但目前为止write操作就讲完了。接下来我们来看下flush操作:

invokeFlush0()

flush也是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:

代码语言:javascript
复制
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}

这里的unsafe成员变量依旧是NioSocketChannelUnsafe对象,跟进去:

代码语言:javascript
复制
public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

这里主要完成两个操作:

① outboundBuffer.addFlush();

添加一个flush到这个ChannelOutboundBuffer,这意味着,将在此之前添加的消息标记为flushed,你将可以处理这些消息。

代码语言:javascript
复制
    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }

a) 如我们前面所说,write操作最终会将包含有待发送消息的ByteBuf封装成Entry对象放入unflushedEntry单向链表的尾部。而这里就会先判断unflushedEntry是否为null,如果为null则说明所有的entries已经被flush了,并在此期间没有新的消息被添加进ChannelOutboundBuffer中。所有直接返回就好。

b) 如果unflushedEntry非空,则说明有待发送的entries等待被发送。那么将unflushedEntry赋值给flushedEntry(调用flush()操作时就是将该flushedEntry单向链表中的entries的数据发到网络),并将unflushedEntry置为null,表示没有待发送的entries了。并通过flushed成员属性记录待发送entries的个数。

② flush0();

代码语言:javascript
复制
protected final void flush0() {
    // Flush immediately only when there's no pending flush.
    // If there's a pending flush operation, event loop will call forceFlush() later,
    // and thus there's no need to call it now.
    if (isFlushPending()) {
        return;
    }
    super.flush0();
}

a) 首先通过isFlushPending()方法来判断flush操作是否需要被挂起:

代码语言:javascript
复制
private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

也就是说,首先会判断当前NioSocketChannel的SelectionKey.OP_WRITE事件是否有被注册到对应的Selector上,如果有,则说明当前写缓冲区已经满了(这里指是socket的写缓冲区满了,并且socket并没有被关闭,那么write操作将返回0。这是如果还有未写出的数据待被发送,那么就会注册SelectionKey.OP_WRITE事件)。等写缓冲区有空间时,SelectionKey.OP_WRITE事件就会被触发,到时NioEventLoop的事件循环就会调用forceFlush()方法来继续将为写出的数据写出,所以这里直接返回就好。

b) 当socket写缓冲区未满,那么就执行flush0()

代码语言:javascript
复制
protected void flush0() {
    if (inFlush0) {
        // Avoid re-entrance
        return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
        return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
        try {
            if (isOpen()) {
                outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
            } else {
                // Do not trigger channelWritabilityChanged because the channel is closed already.
                outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            }
        } finally {
            inFlush0 = false;
        }
        return;
    }

    try {
        doWrite(outboundBuffer);
    } catch (Throwable t) {
        if (t instanceof IOException && config().isAutoClose()) {
            /**
             * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
             * failing all flushed messages and also ensure the actual close of the underlying transport
             * will happen before the promises are notified.
             *
             * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
             * may still return {@code true} even if the channel should be closed as result of the exception.
             */
            close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
        } else {
            outboundBuffer.failFlushed(t, true);
        }
    } finally {
        inFlush0 = false;
    }
}
  1. 判断Channel的输出缓冲区是否为null或待发送的数据个数为0,如果是则直接返回,因为此时并没有数据需要发送。
  2. 判断当前的NioSocketChannel是否是Inactive状态,如果是,则会标识所有等待写请求为失败(即所有的write操作的promise都会是失败完成),并且如果NioSocketChannel已经关闭了,失败的原因是“FLUSH0_CLOSED_CHANNEL_EXCEPTION”且不会回调注册到promise上的listeners;但如果NioSocketChannel还是open的,则失败的原始是“FLUSH0_NOT_YET_CONNECTED_EXCEPTION”并且会回调注册到promise上的listeners。
  3. 调用doWrite(outboundBuffer);方法将Channel输出缓冲区中的数据通过socket传输给对端:

doWrite是一个写循环操作,当满足一定条件时会结束循环。每一次循环会完成的操作:

  1. 判断当前ChannelOutboundBuffer中的数据都已经被传输完了,如果已经传输完了,并且发现NioSocketChannel还注册有SelectionKey.OP_WRITE事件,则将SelectionKey.OP_WRITE从感兴趣的事件中移除,即,Selector不在监听该NioSocketChannel的可写事件了。然后跳出循环,方法返回。
  2. 初始化writtenBytes = 0、done = false、setOpWrite = false三个属性,它们分别表示本次循环已经写出的字节数、本次循环是否写出了所有待写出的数据、是否需要设置SelectionKey.OP_WRITE事件的标志为。
  3. 『ByteBuffer[] nioBuffers = in.nioBuffers()』 获取所有待写出的ByteBuffer,它会将ChannelOutboundBuffer中所有待写出的ByteBuf转换成JDK Bytebuffer(因为,底层依旧是基于JDK NIO的网络传输,所有最终传输的还是JDK 的ByteBuffer对象)。它依次出去每个待写的ByteBuf,然后根据ByteBuf的信息构建一个ByteBuffer(这里的ByteBuf是一个堆外ByteBuf,因此构建出来的ByteBuffer也是一个堆外的ByteBuffer),并设置该ByteBuffer的readerIndex、readableBytes的值为ByteBuf对应的值。然后返回构建好的ByteBuffer[]数组。
  4. 获取本次循环需要写出的ByteBuffer个数
  5. 获取本次循环总共需要写出的数据的字节总数
  6. 根据nioBufferCnt值的不同执行不同的传输流程: 1 nioBufferCnt == 0 :对非ByteBuffer对象的数据进行普通的写操作。 上面我们说了in.nioBuffers()会将ChannelOutboundBuffer中所有待发送的ByteBuf转换成Bytebuffer后返回一个ByteBuffer[]数组,以便后面进行ByteBuffer的传输,而nioBufferCnt则表示待发送ByteBuffer的个数,即ByteBuffer[]数组的长度。注意,这里nioBuffers()仅仅是对ByteBuf对象进行了操作,但是我们从前面的流程可以得知,除了ByteBuf外FileRegion对象也是可以进行底层的网络传输的。因此当待传输的对象是FileRegion时“nioBufferCnt == 0”,那么这是就会调用『AbstractNioByteChannel#doWrite(ChannelOutboundBuffer in)』方法来完成数据的传输。实际上底层就是依靠JDK NIO 的 FileChannel来实现零拷贝的数据传输。 2 nioBufferCnt == 1 :说明只有一个ByteBuffer等待被传输,那么不使用gather的write操作来传输数据(JDK NIO 支持一次写单个ByteBuffer 以及 一次写多个ByteBuffer的聚集写模式) 3 nioBufferCnt > 1 :说明有多个ByteBuffer等待被传输,那么使用JDK NIO的聚集写操作,一次性传输多个ByteBuffer到NioSocketChannel中。

2、3 中写操作的逻辑是一样的:

代码语言:javascript
复制
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
    final long localWrittenBytes = ch.write(...);
    if (localWrittenBytes == 0) {
        setOpWrite = true;
        break;
    }
    expectedWrittenBytes -= localWrittenBytes;
    writtenBytes += localWrittenBytes;
    if (expectedWrittenBytes == 0) {
        done = true;
        break;
    }
}

config().getWriteSpinCount()为16,也就是一次写操作会最多执行16次的SocketChannel.write操作来将数据写到网络中。每次ch.write完都会进行相应的『expectedWrittenBytes -= localWrittenBytes;』操作,将expectedWrittenBytes期待被写的字节数减去已经写出的字节数。如果在最后expectedWrittenBytes依旧大于0,则说明在这16次的socket写操作后依旧还有未写完的数据等待被继续写,那么done就会为false;否则若所有的数据都写完了,done会被置为true。注意,ch.write操作会返回本次写操作写出的字节数,但该方法返回0时,即localWrittenBytes为0,则说明底层的写缓冲区已经满了(这里应该指的是linux底层的写缓冲区满了),这是就会将setOpWrite置为true,此时因为数据还没写完done还是false。那么这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。

  1. 『in.removeBytes(writtenBytes)』:释放所有已经写出去的缓存对象,并修改部分写缓冲的索引。
代码语言:javascript
复制
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;

        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}

通过已经写出数据的字节数来清理或修改ByteBuf。也就是说writtenBytes的大小可能是包含了多个ByteBuf以及某个ByteBuf的部分数据(因为一个ByteBuf可能只写出了部分数据,还未完成被写出到网络层中)。

a) 『 if (readableBytes <= writtenBytes) 』这个if判断表示:本次socket的write操作(这里是真的是网络通信写操作了)已经写出去的字节数”大于"了当前ByteBuf包可读取的字节数。 这说明,当前这个包中所有的可写的数据都已经写完了,既然当前这个ByteBuf的数据都写完了,那么久可以将其删除了。即,调用『remove()』操作,这个操作就会标识异步write操作为成功完成,并且会回调已经注册到ByteBuf的promise上的所有listeners。同时会原子的修改ChannelOutboundBuffer的totalPendingSize属性值,减少已经写出的数据大小(包括Entry对象内存大小和真实数据的大小),并且如果减少后totalPendingSize小于设置 or 默认的WriteBufferWaterMark的low值,并且再次之前totalPendingSize超过了WriteBufferWaterMark的high值,那么将触发fireChannelWritabilityChanged事件。『remove()』操作还会将当前的ByteBuf指向下一个待处理的ByteBuf,最后释放这个已经被写出去的ByteBuf对象资源。

b) 通过上面的分析,我们知道大数据包走的是else流程。也就是说,本次真实写出去的数据 比 当前这个ByteBuf的可读取数据要小(也就说明,当前这个ByteBuf还没有被完全的写完。因此并不会通过调用『remove()』操作。直到整个大数据包所有的内容都写出去了,那么这是if(readableBytes <= writtenBytes)才会为真执行『remove()』完成相关后续的操作)。那么此时,会根据已经写出的字节数大小修改该ByteBuf的readerIndex索引值。并且,如果该异步写操作的ChannelPromise是ChannelProgressivePromise对象并且注册了相应的progressiveListeners事件,则该listener会得到回调。你可以通过该listener来观察到大数据包写出去的进度。

  1. done表示本次写操作是否完成,。有两种情况下done为false: 1 还有未写完的数据待发送,并且写缓冲区已经满了(这里指的是linux底层的写缓冲区满了),无法再继续写出,那么此时setOpWrite标识为true。这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。接着退出doWrite()循环写操作。 2 执行了config().getWriteSpinCount()次(默认16次)socket写操作后,数据仍旧未写完,那么此时会将flush()操作封装成一个task提交至NioEventLoop的taskQueue中,这样在NioEventLoop的下一次事件循环时会就会取出该任务并执行,也就会继续写出未写完的任务了。这也说明了,如果发送的是很大的数据包的话,可能一次写循环操作是无法将数据全部发送出去的,也不会为了发送该大数据包的数据而导致NioEventLoop线程的阻塞以至于影响NioEventLoop上其他Channel的操作和响应。接着退出doWrite()循环写操作。

好了,到目前为止,Netty整个的写流程就分析完了。本文主要专注于写操作的流程,而并未到Netty的内存模式进行展开。

后记

若文章有任何错误,望大家不吝指教:)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.12.15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 源码解析
    • invokeWrite0(msg, promise)
      • invokeFlush0()
        • 后记
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档