本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。
本文主要对Netty的写数据流程进行分析。代码调用仅一句:
ctx.writeAndFlush("from server : " + UUID.randomUUID());
变量 ctx 指的是 ChannelHandlerContext对象,我们跟进ChannelHandlerContext的writeAndFlush方法:
public ChannelFuture writeAndFlush(Object msg) {
return write AndFlush(msg, newPromise());
}
因为写是异步操作,所以如果我们没有自定义一个ChannelPromise的话,就会构建一个默认的ChannelPromise(即,DefaultChannelPromise)来表示该异步操作。我们可以通过往ChannelPromise中注册listener来得到该异步操作的结果(成功 or 失败),listener会在异步操作完成后得到回调。
往下跟,我们会到?流程
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
这里会完成两个重要的步骤:
① invokeWrite0(msg, promise);将消息放入输出缓冲区中(ChannelOutboundBuffer)
② invokeFlush0(); 将输出缓冲区中的数据通过socket发送到网络中
下面我们来详细展开这两步骤
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
write是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程:
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
① 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。
判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。
② 『msg = filterOutboundMessage(msg);』过滤待发送的消息:
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。
『size = pipeline.estimatorHandle().size(msg);』估计待发送数据的大小:
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。
③ 『outboundBuffer.addMessage(msg, size, promise)』将消息加入outboundBuffer中等待发送。
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
首先对ChannelOutboundBuffer、Entry做个简单介绍
ChannelOutboundBuffer
一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。
ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息)
unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。
Entry
Entry是ChannelOutboundBunffer的一个内部类,它是对真实的写消息数据以及其相关信息的一个封装。大致封装了如下信息:
a) pendingSize:记录有该ByteBuf or ByteBufs 中待发送数据大小 和 对象本身内存大小 的累加和;
b) promise:该异步写操作的ChannelPromise(用于在完成真是的网络层write后去标识异步操作的完成以及回调已经注册到该promise上的listeners);
c) total:待发送数据包的总大小(该属性与pendingSize的区别在于,如果是待发送的是FileRegion数据对象,则pengdingSize中只有对象内存的大小,即真实的数据大小被记录为0;但total属性则是会记录FileRegion中数据大小,并且total属性是不包含对象内存大小,仅仅是对数据本身大小的记录);
e) msg:原始消息对象的引用;
f) count:写消息数据个数的记录(如果写消息数据是个数组的话,该值会大于1)
这里说明下,pendingSize属性记录的不单单是写请求数据的大小,记录的是这个写请求对象的大小。这是什么意思了?这里做个简单的介绍:
一个对象占用的内存大小除了实例数据(instance data),还包括对象头(header)以及对齐填充(padding)。所以一个对象所占的内存大小为『对象头 + 实例数据 + 对齐填充』,即
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 8 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
?假设的是64位操作系统下,且没有使用各种压缩选项的情况。对象头的长度占16字节;引用属性占8字节;long类型占8字节;int类型占4字节;boolean类型占1字节。同时,由于HotSpot VM的自动内存管理系统要求对象起始地址必须是8字节的整数倍,也就是说对象的大小必须是8字节的整数倍,如果最终字节数不为8的倍数,则padding会补足至8的倍数。
static final class Entry {
private final Handle<Entry> handle; // reference field ( 8 bytes)
Entry next; // reference field ( 8 bytes)
Object msg; // reference field ( 8 bytes)
ByteBuffer[] bufs; // reference field ( 8 bytes)
ByteBuffer buf; // reference field ( 8 bytes)
ChannelPromise promise; // reference field ( 8 bytes)
long progress; // long field ( 8 bytes)
long total; // long field ( 8 bytes)
int pendingSize; // int field ( 4 bytes)
int count = -1; // int field ( 4 bytes)
boolean cancelled; // boolean field ( 1 bytes)
我们根据上面的理论来计算下Entry对象占用内存的大小:
header (16 bytes) + 6 * reference fields(8 bytes)+ 2 * long fields(8 bytes)+ 2 * int fields(4 bytes)+ 1 * boolean field(1 byte)= 89 ——> 加上7bytes的padding = 96 bytes
这就是CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD默认值 96 的由来。(关于JVM中对象的内存大小的详细分析,欢迎参阅JVM中 对象的内存布局 以及 实例分析)
addMessage方法主要就是将请求写出的数据封装为Entry对象,然后加入到tailEntry和unflushedEntry中。
然后调用『incrementPendingOutboundBytes(entry.pendingSize, false);』对totalPendingSize属性以及unwritable字段做调整。
totalPendingSize字段记录了该ChannelOutboundBuffer中所有带发送Entry对象的占的总内存大小和所有带发送数据的大小。unwritable用来标示当前该Channel要发送的数据是否已经超过了设定 or 默认的WriteBufferWaterMark的high值。如果当前操作导致了待写出的数据(包括Entry对象大小以及真实需要传输数据的大小)超过了设置写缓冲区的高水位,那么将会触发fireChannelWritabilityChanged事件。
WriteBufferWaterMark
WriteBufferWaterMark用于设置写缓存的高水位标志和低水位标志。
如果写缓冲区队列中字节的数量超过了设置的高水位标志,那么Channel#isWritable()方法将开始返回false。然后当写缓冲区中的字节数量减少至小于了低水位标志,Channel#isWritable()方法会重新开始返回true。关于Channel#isWritable()方法目前主要用在ChunkedWriteHandler以及HTTP2的Handler中。因此,如果你想在程序中通过设置WriteBufferWaterMark来控制数据的写出,但你在程序中并没有使用ChunkedWriteHandler或HTTP2,那么这就需要我们自己通过『Channel#isWritable()』来实现是否可用继续写出数据。 比如:
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// ......
if( ctx.channel().isWritable() )
{
ctx.writeAndFlush(...)
}
}
总的来说,在write的操作最终会将ByteBuf封装为一个Entry对象放到unflushedEntry单向链表的尾部(通过修改tailEntry来实现的),并修改用于记录有该ChannelOutboundBuffer中待发送Entry对象总内存大小的属性totalPendingSize字段。
好了,但目前为止write操作就讲完了。接下来我们来看下flush操作:
flush也是一个出站事件,它最终会调用到ChannelPipeline中head的相关方法:
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
这里的unsafe成员变量依旧是NioSocketChannelUnsafe对象,跟进去:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
这里主要完成两个操作:
① outboundBuffer.addFlush();
添加一个flush到这个ChannelOutboundBuffer,这意味着,将在此之前添加的消息标记为flushed,你将可以处理这些消息。
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
a) 如我们前面所说,write操作最终会将包含有待发送消息的ByteBuf封装成Entry对象放入unflushedEntry单向链表的尾部。而这里就会先判断unflushedEntry是否为null,如果为null则说明所有的entries已经被flush了,并在此期间没有新的消息被添加进ChannelOutboundBuffer中。所有直接返回就好。
b) 如果unflushedEntry非空,则说明有待发送的entries等待被发送。那么将unflushedEntry赋值给flushedEntry(调用flush()操作时就是将该flushedEntry单向链表中的entries的数据发到网络),并将unflushedEntry置为null,表示没有待发送的entries了。并通过flushed成员属性记录待发送entries的个数。
② flush0();
protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (isFlushPending()) {
return;
}
super.flush0();
}
a) 首先通过isFlushPending()方法来判断flush操作是否需要被挂起:
private boolean isFlushPending() {
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
也就是说,首先会判断当前NioSocketChannel的SelectionKey.OP_WRITE事件是否有被注册到对应的Selector上,如果有,则说明当前写缓冲区已经满了(这里指是socket的写缓冲区满了,并且socket并没有被关闭,那么write操作将返回0。这是如果还有未写出的数据待被发送,那么就会注册SelectionKey.OP_WRITE事件)。等写缓冲区有空间时,SelectionKey.OP_WRITE事件就会被触发,到时NioEventLoop的事件循环就会调用forceFlush()方法来继续将为写出的数据写出,所以这里直接返回就好。
b) 当socket写缓冲区未满,那么就执行flush0()
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
doWrite是一个写循环操作,当满足一定条件时会结束循环。每一次循环会完成的操作:
2、3 中写操作的逻辑是一样的:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(...);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
config().getWriteSpinCount()为16,也就是一次写操作会最多执行16次的SocketChannel.write操作来将数据写到网络中。每次ch.write完都会进行相应的『expectedWrittenBytes -= localWrittenBytes;』操作,将expectedWrittenBytes期待被写的字节数减去已经写出的字节数。如果在最后expectedWrittenBytes依旧大于0,则说明在这16次的socket写操作后依旧还有未写完的数据等待被继续写,那么done就会为false;否则若所有的数据都写完了,done会被置为true。注意,ch.write操作会返回本次写操作写出的字节数,但该方法返回0时,即localWrittenBytes为0,则说明底层的写缓冲区已经满了(这里应该指的是linux底层的写缓冲区满了),这是就会将setOpWrite置为true,此时因为数据还没写完done还是false。那么这种情况下就会注册当前SocketChannel的写事件(SelectionKey.OP_WRITE)到对应的Selector为感兴趣的事件,这样当写缓冲区有空间时,就会触发SelectionKey.OP_WRITE就绪事件, NioEventLoop的事件循环在处理SelectionKey.OP_WRITE事件时会执行forceFlush()以继续发送外发送完的数据。
public void removeBytes(long writtenBytes) {
for (;;) {
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
通过已经写出数据的字节数来清理或修改ByteBuf。也就是说writtenBytes的大小可能是包含了多个ByteBuf以及某个ByteBuf的部分数据(因为一个ByteBuf可能只写出了部分数据,还未完成被写出到网络层中)。
a) 『 if (readableBytes <= writtenBytes) 』这个if判断表示:本次socket的write操作(这里是真的是网络通信写操作了)已经写出去的字节数”大于"了当前ByteBuf包可读取的字节数。 这说明,当前这个包中所有的可写的数据都已经写完了,既然当前这个ByteBuf的数据都写完了,那么久可以将其删除了。即,调用『remove()』操作,这个操作就会标识异步write操作为成功完成,并且会回调已经注册到ByteBuf的promise上的所有listeners。同时会原子的修改ChannelOutboundBuffer的totalPendingSize属性值,减少已经写出的数据大小(包括Entry对象内存大小和真实数据的大小),并且如果减少后totalPendingSize小于设置 or 默认的WriteBufferWaterMark的low值,并且再次之前totalPendingSize超过了WriteBufferWaterMark的high值,那么将触发fireChannelWritabilityChanged事件。『remove()』操作还会将当前的ByteBuf指向下一个待处理的ByteBuf,最后释放这个已经被写出去的ByteBuf对象资源。
b) 通过上面的分析,我们知道大数据包走的是else流程。也就是说,本次真实写出去的数据 比 当前这个ByteBuf的可读取数据要小(也就说明,当前这个ByteBuf还没有被完全的写完。因此并不会通过调用『remove()』操作。直到整个大数据包所有的内容都写出去了,那么这是if(readableBytes <= writtenBytes)才会为真执行『remove()』完成相关后续的操作)。那么此时,会根据已经写出的字节数大小修改该ByteBuf的readerIndex索引值。并且,如果该异步写操作的ChannelPromise是ChannelProgressivePromise对象并且注册了相应的progressiveListeners事件,则该listener会得到回调。你可以通过该listener来观察到大数据包写出去的进度。
好了,到目前为止,Netty整个的写流程就分析完了。本文主要专注于写操作的流程,而并未到Netty的内存模式进行展开。
若文章有任何错误,望大家不吝指教:)