前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty零拷贝机制

Netty零拷贝机制

作者头像
IT架构圈
发布2020-12-16 10:42:33
7600
发布2020-12-16 10:42:33
举报
文章被收录于专栏:IT架构圈IT架构圈

零拷贝机制是Netty高性能的一个原因,之前都是说netty的线程模型,责任链,说说netty底层的优化,优化就是netty自己的一个缓冲区。

(一)Netty自己的ByteBuf
  • ① 介绍

ByteBuf 是为解决 ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。

  • ② 对比JDK byteBuffer的缺点

无法动态扩容

长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。

API 使用复杂

读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的考虑这些API,否则容出现错误。

  • ③ Netty的ByteBuf 操作

ByteBuf三个重要属性:capacity容量,readerIndex读取位置,writerIndex 写入位置。提供了两个指针变量来支持顺序和写操作,分别是读操作readerIndex 和写操作writeIndex。

常见的方法定义

  1. 随机访问索引 getByte
  2. 顺序读 read*
  3. 顺序写 write*
  4. 清除已读内容discardReadBytes
  5. 清除缓冲区 clear
  6. 搜索操作
  7. 标记和重置
  8. 引用计数和释放
  • ④ 缓冲区是如何被两个指针分割成三个区域的
  1. discardable bytes 已读可丢弃区域
  2. readable bytes 可读区域
  3. writable bytes 待写区域
  • ⑤ 实例
代码语言:javascript
复制
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Test;

import java.util.Arrays;

/**
 * bytebuf的常规API操作示例
 */
public class ByteBufDemo {
    @Test
    public void apiTest() {
        //  +-------------------+------------------+------------------+
        //  | discardable bytes |  readable bytes  |  writable bytes  |
        //  |                   |     (CONTENT)    |                  |
        //  +-------------------+------------------+------------------+
        //  |                   |                  |                  |
        //  0      <=       readerIndex   <=   writerIndex    <=    capacity

        // 1.创建一个非池化的ByteBuf,大小为10个字节
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("原始ByteBuf为====================>" + buf.toString());
        System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");

//        原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
//        1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]



        // 2.写入一段内容
        byte[] bytes = {1, 2, 3, 4, 5};
        buf.writeBytes(bytes);
        System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
        System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
        System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");

//        写入的bytes为====================>[1, 2, 3, 4, 5]
//        写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
//        2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]



        // 3.读取一段内容
        byte b1 = buf.readByte();
        byte b2 = buf.readByte();
        System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
        System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
        System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");

//        读取的bytes为====================>[1, 2]
//        读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
//        3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]



        // 4.将读取的内容丢弃
        buf.discardReadBytes();
        System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
        System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");

//        将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
//        4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]



        // 5.清空读写指针
        buf.clear();
        System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
        System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");

//        将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
//        5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]



        // 6.再次写入一段内容,比第一段内容少
        byte[] bytes2 = {1, 2, 3};
        buf.writeBytes(bytes2);
        System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
        System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
        System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");

//        写入的bytes为====================>[1, 2, 3]
//        写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
//        6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]



        // 7.将ByteBuf清零
        buf.setZero(0, buf.capacity());
        System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
        System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "\n");

//        将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
//        7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


        // 8.再次写入一段超过容量的内容
        byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
        buf.writeBytes(bytes3);
        System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
        System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
        System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "\n");


//        写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
//        写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
//        8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

        //  随机访问索引 getByte
        //  顺序读 read*
        //  顺序写 write*
        //  清除已读内容 discardReadBytes
        //  清除缓冲区 clear
        //  搜索操作
        //  标记和重置
        //  完整代码示例:参考
        // 搜索操作 读取指定位置 buf.getByte(1);
        //
    }

}


  • ⑥ ByteBuf 动态扩容

capacity 默认值:256字节,最大值:Integer.MAX_VALUE(2GB)

write 方法调用时,通过AbstractByteBuf.ensureWritable进行检查。 容量计算方法:AbstractByteBufAllocator.calculateNewCapacity(新capacity的最小要求,capacity最大值)

根据新的capacity的最小值要求,对应有两套计算方法

没超过4兆:从64字节开发,每次增加一倍,直至计算出来的newCapacity满足新容量最小要求。示例:当前大小256,已写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512

超过4兆:新容量 = 新容量最小要求/4兆 * 4兆 +4兆 示例:当前大小3兆,已写3兆,继续写2兆数据,需要的容量最小要求是5兆, 则新容量是9兆(不能超过最大值)

  • ⑦ 选择合适的 ByteBuf 实现

在实际使用中都是通过 ByteBufAllocator 分配器进行申请,同时分配器具有内存管理的功能。

  1. unsafe 用到了 Unsafe 工具类,Unsafe 是 Java 保留的一个底层工具包,safe 则没有用到 unsafe 工具类。
  2. unsafe 意味着不安全的操作,但是更底层的操作会带来性能提升和特殊功能,Netty 中会尽力使用 unsafe。
  3. Java 语言很重要的特性是“一次编写导出运行”,所以它针对底层的内存或其他操作,做了很多封装。而 unsafe 提供了一系列操作底层的方法,可能会导致不兼容或者不可知的异常。

unpool 每次申请缓冲区时会新建一个,并不会复用,使用 Unpooled 工具类可以创建 unpool 的缓冲区。

Netty 没有给出很便捷的 pool 类型的缓冲区的创建方法。使用 ChannelConfig.getAllocator() 时,获取到的分配器是默认支持内存复用的。

  • ⑧ pooledByteBuf对象、内存

PoolThreadCache: PooledByteBufAllocator 实例维护了一个线程变量。 多种分类的MemoryRegionCache数组用作内存缓存,MemoryRegionCache内部是链表,队列里面存Chunk。 Pool Chunk里面维护了内存引用,内存复用的做法就是把buf的memory指向Chunck的memory。

(二)零拷贝
  • ① 介绍

Netty 的零拷贝机制,是一种应用层的实现。

  • ② 拷贝方式

一般的数组合并,会创建一个大的数组,然后将需要合并的数组放进去。 Netty 的 CompositeButyBuf 将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免了各个 ByteBuf 之间的拷贝。

代码语言:javascript
复制
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);

wrappedBuffer 方法将 byte[] 数组包装成 ByteBuf 对象

代码语言:javascript
复制
ByteBuf byteBuf = Unpooled.wrappedBuffer(new Byte[]{1, 2, 3, 4, 5});

slice 方法将一个 ByteBuf 对象切分成多个 ByteBuf 对象

代码语言:javascript
复制
 ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
ByteBuf newBuffer = buffer1.slice(1, 2);
newBuffer.unwrap();
  • ③ 实例
代码语言:javascript
复制
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

/**
 * 零拷贝示例
 */
public class ZeroCopyTest {
    @org.junit.Test
    public void wrapTest() {
        byte[] arr = {1, 2, 3, 4, 5};
        ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
        System.out.println(byteBuf.getByte(4));
        arr[4] = 6;
        System.out.println(byteBuf.getByte(4));
    }

    @org.junit.Test
    public void sliceTest() {
        ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
        ByteBuf newBuffer = buffer1.slice(1, 2);
        newBuffer.unwrap();
        System.out.println(newBuffer.toString());
    }

    @org.junit.Test
    public void compositeTest() {
        ByteBuf buffer1 = Unpooled.buffer(3);
        buffer1.writeByte(1);
        ByteBuf buffer2 = Unpooled.buffer(3);
        buffer2.writeByte(4);
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
        CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
        System.out.println(newBuffer);
    }

}

PS:API操作便捷性,动态扩容,多种ByteBuf实现,高效的零拷贝机制(逻辑上边的设计)上边的所有就是nettyByteBuf所做的工作,性能提升,操作性增强。有了理论下节开始实战netty。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程坑太多 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (一)Netty自己的ByteBuf
  • (二)零拷贝
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档