一直想知道
channel.write
返回时, 到底这个数据是交给操作系统了, 还是说已经发出网卡了, 还是说已经发出去收到ACK了. (答案:只是说明它写入了内核的send_queue)getTemporaryDirectBuffer
和SocketChannelImpl
值得分析
探究socketChannel原理提到了socket非直接内存的读写都是两次复制
读写的原理
上图便回答开文的第一个问题,当你写出成功返回时,只是说明它写入了内核的send_queue
SocketChannelImpl::write:
public int write(ByteBuffer buf) throws IOException {
...
try {
...
for (;;) {
n = IOUtil.write(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
}
} finally {
...
}
}
}
依赖于IOUtil::write
static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd)
throws IOException
{
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0); // 确定直接内存大小
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); // 分配直接内存
try {
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, nd); // 直接内存往外写
if (n > 0) {
// now update src
src.position(pos + n);
}
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
两个调用很重要:
Util.getTemporaryDirectBuffer
分配直接内存writeFromNativeBuffer(fd, bb, position, nd);
从直接内存写数据 public static ByteBuffer getTemporaryDirectBuffer(int size) {
BufferCache cache = bufferCache.get(); // 获取线程私有变量
ByteBuffer buf = cache.get(size); // 关键,获取指定大小的内存
if (buf != null) {
return buf;
} else {
// 没有满足条件的直接内存
// 把第一个内存给释放
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size); // 关键,分配直接内存
}
}
...
private static ThreadLocal<BufferCache> bufferCache =
new ThreadLocal<BufferCache>()
{
@Override
protected BufferCache initialValue() {
return new BufferCache();
}
};
这里我们易看出,线程私有变量bufferCache
对直接内存进行了管理, 下面我们要分析该类
/**
* A simple cache of direct buffers.
*/
private static class BufferCache {
// the array of buffers
private ByteBuffer[] buffers;
// the number of buffers in the cache
private int count;
// the index of the first valid buffer (undefined if count == 0)
private int start;
private int next(int i) {
return (i + 1) % TEMP_BUF_POOL_SIZE;
}
BufferCache() {
buffers = new ByteBuffer[TEMP_BUF_POOL_SIZE];
}
...
懒得分析了,挺简单的一个类,总之就是
next
取余说明它是个循环数组get(int size)
会找到第一个大小至少大于size
的元素索引。由于将其返回后该位置会为空,就把第一个元素(buffer[start])置于此即可。getTemporaryDirectBuffer
会尽量返回第一个思考:多读读代码会发现,如果临时要输出的内存大小突然变大,会导致突然分配一个很大的ByteBuffer元素。如果之后又用不上这么大的内存,就比较浪费, 所以
《Hadoop技术内幕》提到了可将写入的数据分成固定大小(比如8KB)的chunk,并以chunk为单位写入DirectBuffer
没找到native源码,算了不分析了
我们看到继承链:
image.png
强烈建议阅读这两个类的注释
public interface WritableByteChannel
extends Channel
{
/**
* @return The number of bytes written, possibly zero
*
* @throws ...
*/
public int write(ByteBuffer src) throws IOException;
}
public interface ReadableByteChannel extends Channel {
/**
*
* @return The number of bytes read, possibly zero, or <tt>-1</tt> if the
* channel has reached end-of-stream
*
* @throws ...
*/
public int read(ByteBuffer dst) throws IOException;
}
上面两个说明了,write会返回的值是0或正数,read会返回的值可能是0, 正数, 或-1(当EOF时)。 然后注释懒得贴上来翻译,我总结一下吧: