前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】深入分析ByteBuf

【Netty】深入分析ByteBuf

作者头像
周三不加班
发布2019-09-04 10:08:15
1.1K0
发布2019-09-04 10:08:15
举报

之前的关于【Netty】的文章我们已经了解到 Netty 里面数据读写是以 ByteBuf 为单位进行交互的,这一小节,我们就来详细剖析一下 ByteBuf 之前文章链接:

【Netty】NIO编程的利器

【Netty】客户端和服务端实现双向通信

【Netty】浅谈Netty的线程模型

1结构

从上面这幅图可以看到,ByteBuf 是一个字节容器,容器里面的的数据分为三个部分:

  • 第一个部分是已经丢弃的字节,这部分数据是无效的;
  • 第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;
  • 最后一部分的数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段。最后一部分虚线表示的是该 ByteBuf 最多还能扩容多少容量

总结:

以上三段内容是被两个指针给划分出来的,从左到右,依次是读指针(readerIndex)、写指针(writerIndex),然后还有一个变量 capacity,表示 ByteBuf 底层内存的总容量,从 ByteBuf 中每读取一个字节,readerIndex 自增1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读, 由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读

写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了,ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错

Netty 使用 ByteBuf 这个数据结构可以有效地区分可读数据和可写数据,读写之间相互没有冲突,当然,ByteBuf 只是对二进制数据的抽象

Netty抽象出来的ByteBuf和JDK的ByteBuffer 的区别在于:

1.1

1.扩容

ByteBuffer

ByteBuffer缓冲区的长度固定,分多了会浪费内存,分少了存放大的数据时会索引越界,所以使用ByteBuffer时,为了解决这个问题,我们一般每次put操作时,都会对可用空间进行校检,如果剩余空间不足,需要重新创建一个新的ByteBuffer,然后将旧的ByteBuffer复制到新的ByteBuffer中去。

ByteBuf

而ByteBuf则对其进行了改进,它会自动扩展,具体的做法是,写入数据时,会调用ensureWritable方法,传入我们需要写的字节长度,判断是否需要扩容:

源码:

public ByteBuf ensureWritable(int minWritableBytes) {
    if(minWritableBytes < 0) {
        throw Exception
    } else if(minWritableBytes <= this.writableBytes()) {
        return this;
    } else if(minWritableBytes > this.maxCapacity - this.writerIndex) {
        throw Exception
    } else {
        //扩容
        int newCapacity = this.calculateNewCapacity(this.writerIndex + minWritableBytes);
        this.capacity(newCapacity);
        return this;
    }
}

可以看到,具体新容量的计算在calculateNewCapacity方法中:

private int calculateNewCapacity(int minNewCapacity) {
    int maxCapacity = this.maxCapacity;
    int threshold = 4194304; //4mb阀值
    if(minNewCapacity == 4194304) {//如果新容量为阀值,直接返回
        return 4194304;
    } else {
        int newCapacity;
        if(minNewCapacity > 4194304) {//如果传入的新容量大于阀值,进行计算
            newCapacity = minNewCapacity / 4194304 * 4194304;
            if(newCapacity > maxCapacity - 4194304) {//如果大于最大容量,新容量为最大容量
                newCapacity = maxCapacity;
            } else {//否则新容量 + 阀值 4mb,按照阀值扩容
                newCapacity += 4194304;
            }

            return newCapacity;
        } else {//如果小于阀值,则以64为计数倍增,知道倍增的结果>=需要的容量值
            for(newCapacity = 64; newCapacity < minNewCapacity; newCapacity <<= 1) {
                ;
            }

            return Math.min(newCapacity, maxCapacity);
        }
    }
}

请注意:

  1. 当申请的新空间大于阀值时,采用每次步进4MB的方式进行扩张内存,而不是倍增,因为这会造成内存膨胀和浪费
  2. 而但申请的新空间小于阀值时,则以64为基数进行倍增而不是步进,因为当内存比较小的时候,倍增是可以接受的(64 -> 128 和 10Mb -> 20Mb相比)

1.2

2.位置指针

ByteBuffer

ByteBuffer中只有一个位置指针position(ByteBuf有两个),所以需要我们手动得调用flip等方法,例如:

ByteBuffer buffer = ByteBuffer.allocate(88);
String value = "我的博客";
buffer.put(value.getBytes());
buffer.flip();
byte[] array = new byte[buffer.remaining()];
buffer.get(array);
String decodeValue = new String(array);

ByteBuffer中会有三个下标,初始位置0,当前位置positon,limit位置,初始时,position为0,limit为Buffer数组末尾 调用buffer.put(value.getBytes())后:

不调用flip: 从缓冲区读取的是position — limit位置的数据,明显不是我们要的

调用flip: 会将limit设置为position,position设置为0,,此时读取的数据

ByteBuf

ByteBuf中使用两个指针,readerIndex,writerIndex来指示位置,初始时readrIndex = writerIndex = 0,当写入数据后:

writerIndex — capacity:可写容量

readerIndex — writerIndex:可读部分

当读取了M个字节后:

调用discardReadBytes,会释放掉discardReadBytes的空间,并把readableBytes复制到从0开始的位置,因此这里会发生内存复制,频繁调用会影响性能

2ByteBuf分析

ByteBuf最重要的两个基类,AbstractByteBuf提供了骨干实现,AbstractReferenceCountedByteBufAbstractByteBuf的子类,它的子类很常使用

2.1

分类

1

从内存分配角度分类

  • 堆内存字节缓冲区(HeapByteBuf):UnPoolHeapByteBufPooledHeapByteBuf 它的特点是内存的分配和回收都在堆,所以速度很快;缺点就是进行SocketIO读写,需要把堆内存对应的缓冲区复制到内核Channel中,这内存复制会影响性能
  • 直接内存缓冲区(DirectByteBuf):UnPoolDirectByteBufUnPoolUnsafeDirectByteBufPoolDirectByteBufPoolUnsafeDirectByteBuf它的特点是由于内存的分配在非堆(方法区),不需要内存复制,所以IO读取的速度较快,但是内存的分配较慢
  • 总结: 根据两种内存的特点,我们可以知道,IO读写时最好使用DirectByteBuf,而在后端业务消息的解编码最好使用HeapByteBuf

2

从内存回收的角度分类

  • 基于对象池的ByteBufPoolByteBuf):PooledByteBuf和它的子类PoolDirectByteBufPoolUnsafeDirectByteBufPooledHeapByteBuf 它的特点是可以循环利用创建的ByteBuf,提高了内存的使用效率,PoolByteBuf的实现牵涉的数据结构很多,PoolByteBuf首先会申请一大块内存区域PoolArenaPoolArena由多个Chunk组成,而每个Chunk由一个或多个page组成 具体看《Netty权威指南》;
  • 普通的ByteBuf(UnPoolByteBuf)UnPoolDirectByteBufUnPoolUnsafeDirectByteBufUnPoolHeapByteBuf
  • 总结: 在高负载,大并发的情况下对象池的ByteBuf更好,而在一般情况下,可以使用UnPoolByteBuf

2.2

Netty的零拷贝

Netty的零拷贝主要体现在三个方面:

  • 第一种实现:DirectByteBuf 就如上所说,ByteBuf可以分为HeapByteBufDirectByteBuf,当使用DirectByteBuf可以实现零拷贝
  • 第二种实现:CompositeByteBuf CompositeByteBuf将多个ByteBuf封装成一个ByteBuf,对外提供封装后的ByteBuf接口
  • 第三种实现:DefaultFileRegion DefaultFileRegion是Netty的文件传输类,它通过transferTo方法将文件直接发送到目标Channel,而不需要循环拷贝的方式,提升了传输性能

2.3

Netty的内存回收管理

Netty会通过 引用计数法 及时申请释放不再被引用的对象 ,实现上是通过 AbstractReferenceCountedByteBuf来实现的,我们看上面的结构图,可以看到AbstractReferenceCountedByteBufAbstractByteBuf的直接子类,所有具体的实现ByteBuf(堆,非堆等)都是继承自AbstractReferenceCountedByteBuf,也就是说,Netty的具体的实现ByteBuf,都是具有内存回收管理的功能的

AbstractReferenceCountedByteBuf有两个重要的成员变量:

  • AtomicIntegerFieldUpdater< AbstractReferenceCountedByteBuf> refCntUpdater 用来更新引用数,使用原子类,达到线程安全
  • volatile int refCnt = 1 用来记录引用数,保证可见性

引用+1

public ByteBuf retain() {
        int refCnt;
        do {
            refCnt = this.refCnt;
            if(refCnt == 0) {
                throw new IllegalReferenceCountException(0, 1);
            }

            if(refCnt == 2147483647) {
                throw new IllegalReferenceCountException(2147483647, 1);
            }
        } while(!refCntUpdater.compareAndSet(this, refCnt, refCnt + 1));

        return this;
    }

引用-1

 public final boolean release() {
        int refCnt;
        do {
            refCnt = this.refCnt;
            if(refCnt == 0) {
                throw new IllegalReferenceCountException(0, -1);
            }
        } while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - 1));

        if(refCnt == 1) {
            this.deallocate();
            return true;
        } else {
            return false;
        }
    }

可以看出,无论是增加引用还是释放引用,都是使用了CAS

refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)

对象是his自身,也就是说,统计的是自身的引用数,例如对于UnPoolHeapByteBuf来说,它具有统计有多少对象引用着它,当引用数refCnt == 1时,表示此事已经没有对象引用它了,此时便调用deallocate来释放内存。

3ByteBuf常用API介绍

容量 API

capacity()

表示 ByteBuf 底层占用了多少字节的内存(包括丢弃的字节、可读字节、可写字节)

maxCapacity()

表示 ByteBuf 底层最大能够占用多少字节的内存,当向 ByteBuf 中写数据的时候,如果发现容量不足,则进行扩容,直到扩容到 maxCapacity,超过这个数,就抛异常

readableBytes() 与 isReadable()

readableBytes() 表示 ByteBuf 当前可读的字节数,它的值等于 writerIndex-readerIndex,如果两者相等,则不可读,isReadable() 方法返回 false

writableBytes()、 isWritable() 与 maxWritableBytes()

writableBytes() 表示 ByteBuf 当前可写的字节数,它的值等于 capacity-writerIndex,如果两者相等,则表示不可写,isWritable() 返回 false,但是这个时候,并不代表不能往 ByteBuf 中写数据了, 如果发现往 ByteBuf 中写数据写不进去的话,Netty 会自动扩容 ByteBuf,直到扩容到底层的内存大小为 maxCapacity,而 maxWritableBytes() 就表示可写的最大字节数,它的值等于 maxCapacity-writerIndex

读写指针相关的 API

readerIndex() 与 readerIndex(int)

前者表示返回当前的读指针 readerIndex, 后者表示设置读指针

writeIndex() 与 writeIndex(int)

前者表示返回当前的写指针 writerIndex, 后者表示设置写指针

markReaderIndex() 与 resetReaderIndex()

前者表示把当前的读指针保存起来,后者表示把当前的读指针恢复到之前保存的值,下面两段代码是等价的

// 代码片段1
int readerIndex = buffer.readerIndex();
// .. 其他操作
buffer.readerIndex(readerIndex);


// 代码片段二
buffer.markReaderIndex();
// .. 其他操作
buffer.resetReaderIndex();

根据Netty大佬闪电侠介绍: 希望大家多多使用代码片段二这种方式,不需要自己定义变量,无论 buffer 当作参数传递到哪里,调用 resetReaderIndex() 都可以恢复到之前的状态,在解析自定义协议的数据包的时候非常常见,推荐大家使用这一对 API markWriterIndex() 与 resetWriterIndex()

作用与上述一对 API 类似,不再 赘述

读写 API

关于 ByteBuf 的读写都可以看作从指针开始的地方开始读写数据

writeBytes(byte[] src) 与 buffer.readBytes(byte[] dst)

writeBytes() 表示把字节数组 src 里面的数据全部写到 ByteBuf,而 readBytes() 指的是把 ByteBuf 里面的数据全部读取到 dst,这里 dst 字节数组的大小通常等于 readableBytes(),而 src 字节数组大小的长度通常小于等于 writableBytes()

writeByte(byte b) 与 buffer.readByte()

writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 这里就不一一赘述了,相信读者应该很容易理解这些 API与读写 API 类似的 API 还有 getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针,这点在解析数据的时候千万要注意

release() 与 retain()

由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。有点类似于c语言里面,申请到的内存必须手工释放,否则会造成内存泄漏。Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存。默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。

slice()、duplicate()、copy()

  • slice() 方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex,同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes()
  • duplicate() 方法把整个 ByteBuf 都截取出来,包括所有的数据,指针信息
  • slice() 方法与 duplicate() 方法的相同点是:底层内存以及引用计数与原始的 ByteBuf 共享,也就是说经过 slice() 或者 duplicate() 返回的 ByteBuf 调用 write 系列方法都会影响到 原始的 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针.
  • slice() 方法与 duplicate() 不同点就是:slice() 只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到 原始 ByteBuf 的 readableBytes(), 而 duplicate() 是把整个 ByteBuf 都与原始的 ByteBuf 共享.
  • lice() 方法与 duplicate() 方法不会拷贝数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy() 会直接从原始的 ByteBuf 中拷贝所有的信息,包括读写指针以及底层对应的数据,因此,往 copy() 返回的 ByteBuf 中写数据不会影响到原始的 ByteBuf
  • slice() 和 duplicate() 不会改变 ByteBuf 的引用计数,所以原始的 ByteBuf 调用 release() 之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放,这个时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain() 方法 来增加引用,表示它们对应的底层的内存多了一次引用,引用计数为2,在释放内存的时候,需要调用两次 release() 方法,将引用计数降到零,才会释放内存. 这三个方法均维护着自己的读写指针,与原始的 ByteBuf 的读写指针无关,相互之间不受影响,其次使用到 slice 和 duplicate 方法的时候,千万要理清内存共享,引用计数共享,读写指针不共享,不释放会造成内存泄漏。

在一个函数体里面,只要增加了引用计数(包括 ByteBuf 的创建和手动调用 retain() 方法),就必须调用 release() 方法

结合历史文章,Netty的基础知识也算是在逐步更新,接下来需要学习的大概是以下内容:

  • 客户端与服务端通信协议编码
  • 客户端与服务端收发消息
  • pipelinechannelHandler
  • 拆包沾包理论与解决方案
  • channel的生命周期
  • 心跳与空闲检测
  • 项目实战:仿写一个即时通讯系统

以上代码会同步更新在本人的Github和CSDN上

Github地址:https://github.com/Bylant/LeetCode

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员啊粥 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ByteBuffer
    • 容量 API
    • 读写指针相关的 API
    • 读写 API
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档