零拷贝机制是Netty高性能的一个原因,之前都是说netty的线程模型,责任链,说说netty底层的优化,优化就是netty自己的一个缓冲区。
ByteBuf 是为解决 ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。
无法动态扩容
长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
API 使用复杂
读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的考虑这些API,否则容出现错误。
ByteBuf三个重要属性:capacity容量,readerIndex读取位置,writerIndex 写入位置。提供了两个指针变量来支持顺序和写操作,分别是读操作readerIndex 和写操作writeIndex。
常见的方法定义
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;
import java.util.Arrays;
/**
* bytebuf的常规API操作示例
*/
public class ByteBufDemo {
@Test
public void apiTest() {
// +-------------------+------------------+------------------+
// | discardable bytes | readable bytes | writable bytes |
// | | (CONTENT) | |
// +-------------------+------------------+------------------+
// | | | |
// 0 <= readerIndex <= writerIndex <= capacity
// 1.创建一个非池化的ByteBuf,大小为10个字节
ByteBuf buf = Unpooled.buffer(10);
System.out.println("原始ByteBuf为====================>" + buf.toString());
System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
// 1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// 2.写入一段内容
byte[] bytes = {1, 2, 3, 4, 5};
buf.writeBytes(bytes);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 写入的bytes为====================>[1, 2, 3, 4, 5]
// 写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
// 2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
// 3.读取一段内容
byte b1 = buf.readByte();
byte b2 = buf.readByte();
System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 读取的bytes为====================>[1, 2]
// 读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
// 3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
// 4.将读取的内容丢弃
buf.discardReadBytes();
System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
// 4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
// 5.清空读写指针
buf.clear();
System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
// 5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
// 6.再次写入一段内容,比第一段内容少
byte[] bytes2 = {1, 2, 3};
buf.writeBytes(bytes2);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 写入的bytes为====================>[1, 2, 3]
// 写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
// 6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
// 7.将ByteBuf清零
buf.setZero(0, buf.capacity());
System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");
// 将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
// 7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// 8.再次写入一段超过容量的内容
byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
buf.writeBytes(bytes3);
System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");
// 写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
// 写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
// 8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// 随机访问索引 getByte
// 顺序读 read*
// 顺序写 write*
// 清除已读内容 discardReadBytes
// 清除缓冲区 clear
// 搜索操作
// 标记和重置
// 完整代码示例:参考
// 搜索操作 读取指定位置 buf.getByte(1);
//
}
}
capacity 默认值:256字节,最大值:Integer.MAX_VALUE(2GB)
write 方法调用时,通过AbstractByteBuf.ensureWritable进行检查。 容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)
根据新的capacity的最小值要求,对应有两套计算方法
没超过4兆:从64字节开发,每次增加一倍,直至计算出来的newCapacity满足新容量最小要求。示例:当前大小256,已写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512
超过4兆:新容量 = 新容量最小要求/4兆 * 4兆 +4兆 示例:当前大小3兆,已写3兆,继续写2兆数据,需要的容量最小要求是5兆, 则新容量是9兆(不能超过最大值)
在实际使用中都是通过 ByteBufAllocator 分配器进行申请,同时分配器具有内存管理的功能。
unpool 每次申请缓冲区时会新建一个,并不会复用,使用 Unpooled 工具类可以创建 unpool 的缓冲区。
Netty 没有给出很便捷的 pool 类型的缓冲区的创建方法。使用 ChannelConfig.getAllocator() 时,获取到的分配器是默认支持内存复用的。
PoolThreadCache: PooledByteBufAllocator 实例维护了一个线程变量。 多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chunk。 Pool Chunk里面维护了内存引用,内存复用的做法就是把buf的memory指向Chunck的memory。
Netty 的零拷贝机制,是一种应用层的实现。
一般的数组合并,会创建一个大的数组,然后将需要合并的数组放进去。 Netty 的 CompositeButyBuf 将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝。
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
wrappedBuffer 方法将 byte[] 数组包装成 ByteBuf 对象
ByteBuf byteBuf = Unpooled.wrappedBuffer(new Byte[]{1, 2, 3, 4, 5});
slice 方法将一个 ByteBuf 对象切分成多个 ByteBuf 对象
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
/**
* 零拷贝示例
*/
public class ZeroCopyTest {
@org.junit.Test
public void wrapTest() {
byte[] arr = {1, 2, 3, 4, 5};
ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
System.out.println(byteBuf.getByte(4));
arr[4] = 6;
System.out.println(byteBuf.getByte(4));
}
@org.junit.Test
public void sliceTest() {
ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
System.out.println(newBuffer.toString());
}
@org.junit.Test
public void compositeTest() {
ByteBuf buffer1 = Unpooled.buffer(3);
buffer1.writeByte(1);
ByteBuf buffer2 = Unpooled.buffer(3);
buffer2.writeByte(4);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
System.out.println(newBuffer);
}
}
PS:API操作便捷性,动态扩容,多种ByteBuf实现,高效的零拷贝机制(逻辑上边的设计)上边的所有就是nettyByteBuf所做的工作,性能提升,操作性增强。有了理论下节开始实战netty。