前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Netty8# Netty之ByteBuf初探

Netty8# Netty之ByteBuf初探

作者头像
瓜农老梁
发布2021-01-12 11:10:30
3760
发布2021-01-12 11:10:30
举报
文章被收录于专栏:瓜农老梁

前言

字节的流动形成了流,Netty作为优秀的通信框架他的字节是如何流动的,本文就理一下这个事。梳理完Netty的字节流动与JDK提供的ByteBuffer一对比看下Netty方便在哪里。本分从官方文档概念原理入手梳理,然后看下源码解读下这些原理如何实现的,体验一把Netty写入数据自动扩容,探究下这个过程如何实现的。

一、基本概念

1.ByteBuf创建

使用Unpooled类来创建ByteBuf,不建议使用ByteBuf的构造函数自己去创建。

2.读写索引

ByteBuf提供了两个指针readerIndex和writerIndex,分别记录读、写的开始位置。两个指针将ByteBuf分成了三个区域。

3.discardable bytes

这个区间的范围为0~readerIndex,已经被读过的、可废弃的区域。通过调用discardReadBytes(),可以释放discardable bytes区域。这个区域释放后,可写区域(writable bytes)部分增多。

4.readable bytes

可读区域的范围为(writerIndex-readerIndex)

5.writable bytes

可写区域的范围为(capacity-writerIndex)

6.清理索引

调用Buffer.clear()后,读写索引全部归零,缓存buffer被释放。

二、ByteBuf的构建

接下来通过示例窜下上面的知识点,看下源码是如何实现的,示例中将字符串写入ByteBuf中,然后再读出来打印。

代码语言:javascript
复制
@Test
public void testWriteUtf81() {
     String str1 = "瓜农";
     ByteBuf buf = Unpooled.buffer(1);
     buf.writeBytes(str1.getBytes(CharsetUtil.UTF_8));
     ByteBuf readByteBuf = ByteBufUtil.readBytes(UnpooledByteBufAllocator.DEFAULT,buf,str1.getBytes(CharsetUtil.UTF_8).length);
     System.out.print(readByteBuf.toString(CharsetUtil.UTF_8));
}

源码解读

代码语言:javascript
复制
public static ByteBuf buffer(int initialCapacity) {
 return ALLOC.heapBuffer(initialCapacity); // 注解@1
}

public ByteBuf heapBuffer(int initialCapacity) {
  return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); // 注解@2
}

InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  super(alloc, initialCapacity, maxCapacity);
}

public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
  super(maxCapacity);

  if (initialCapacity > maxCapacity) {
    throw new IllegalArgumentException(String.format(
      "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
  }

  this.alloc = checkNotNull(alloc, "alloc");
  setArray(allocateArray(initialCapacity)); // 注解@3
  setIndex(0, 0); // 注解@4
}

注解@1 使用ByteBufAllocator来分配ByteBuf,默认为UnpooledByteBufAllocator。

注解@2 initialCapacity为初始容量例子中给的为16,maxCapacity为默认的DEFAULT_MAX_CAPACITY=Integer.MAX_VALUE。

注解@3 allocateArray()的方法如下,此时使用JDK的byte[]初始化缓存区。通过setArray(),UnpooledHeapByteBuf持有byte[]缓存区。

代码语言:javascript
复制
 protected byte[] allocateArray(int initialCapacity) {
   return new byte[initialCapacity];
 }

private void setArray(byte[] initialArray) {
  array = initialArray;
  tmpNioBuf = null;
}

注解@4 初始化readerIndex和writerIndex,均为0。

小结 ByteBuf的构建通过Unpooled来分配,示例中通过UnpooledByteBufAllocator持有byte[]、 readerIndex、writerIndex、maxCapacity完成ByteBuf的初始化。示例中array数组大小为16;readerIndex=writerIndex=0;maxCapacity=Integer.MAX_VALUE。

三、写入数据

代码语言:javascript
复制
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
  ensureWritable(length); // 注解@5
  setBytes(writerIndex, src, srcIndex, length); // 注解@6
  writerIndex += length; // 注解@7
  return this;
}

注解@5 确保剩余的空间能够容纳需写入的数据。

具体逻辑如下:如果写入的数据长度小于已经分配的容量空间capacity则允许直接返回;

如果写入的数据长度超过允许的最大容量maxCapacity直接抛出IndexOutOfBoundsException拒绝;

如果写入数据长度大于已经分配的空间capacity但是小于最大最大允许空间maxCapacity,则需要扩容。

代码语言:javascript
复制
final void ensureWritable0(int minWritableBytes) {
    final int writerIndex = writerIndex(); // 注解@5.1
    final int targetCapacity = writerIndex + minWritableBytes; // 注解@5.2
    if (targetCapacity <= capacity()) { // 注解@5.3
        ensureAccessible();
        return;
    }
    if (checkBounds && targetCapacity > maxCapacity) { // 注解@5.4 
        ensureAccessible();
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the target capacity to the power of 2.
    final int fastWritable = maxFastWritableBytes(); // 注解@5.5 
    int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
            : alloc().calculateNewCapacity(targetCapacity, maxCapacity); // 注解@5.6

    // Adjust to the new capacity.
    capacity(newCapacity); // 注解@5.7
}

注解@5.1 获取当前写索引

注解@5.2 计算需要的容量

注解@5.3 与当前已分配的容量capacity进行比较

注解@5.4 不能超过最大允许的容量maxCapacity

注解@5.5 fastWritable = capacity() - writerIndex

注解@5.6 newCapacity的判断通常走到这里应该为,剩余的空间不够了。所以通常会进入alloc().calculateNewCapacity(targetCapacity, maxCapacity)。

代码语言:javascript
复制
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        // ...
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
        if (minNewCapacity == threshold) { // 注解@5.6.1
            return threshold;
        }
        // If over threshold, do not double but just increase by threshold.
        if (minNewCapacity > threshold) { // 注解@5.6.2
            int newCapacity = minNewCapacity / threshold * threshold;
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        // Not over threshold. Double up to 4 MiB, starting from 64.
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) { // 注解@5.6.3
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
    }

注解@5.6.1 如果写入的数据长度刚好为4M则返回threshold=4M

注解@5.6.2 如果写入的数据长度大于4M,newCapacity不再翻倍增长,通过minNewCapacity / threshold * threshold计算刚容下需要的数据即可。

注解@5.6.3 如果写入的数据长度小于4M,则newCapacity从64翻倍增长(128、256、512...),直到newCapacity能够容纳需要写入的数据。

注解@5.7 确定了要扩容的容量newCapacity后,我们看下如何扩容的。

代码语言:javascript
复制
public ByteBuf capacity(int newCapacity) {
    checkNewCapacity(newCapacity);
    byte[] oldArray = array;
    int oldCapacity = oldArray.length;
    if (newCapacity == oldCapacity) {
      return this;
    }

    int bytesToCopy;
    if (newCapacity > oldCapacity) {
      bytesToCopy = oldCapacity;
    } else {
      trimIndicesToCapacity(newCapacity);
      bytesToCopy = newCapacity;
    }
    byte[] newArray = allocateArray(newCapacity); // 注解@5.7.1
    System.arraycopy(oldArray, 0, newArray, 0, bytesToCopy); // 注解@5.7.2
    setArray(newArray); // 注解@5.7.3
    freeArray(oldArray); // 注解@5.7.4
    return this;
}

注解@5.7.1 使用新的容量初始化newArray=new byte[initialCapacity]

注解@5.7.2 将旧的oldArray数据拷贝到新的newArray=new中

注解@5.7.3 将UnpooledHeapByteBuf的byte[]引用替换为newArray

注解@5.7.4 oldArray清理操作

注解@6 写入数据,通过System.arraycopy将数据写入array中。

代码语言:javascript
复制
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
  checkSrcIndex(index, length, srcIndex, src.length);
  System.arraycopy(src, srcIndex, array, index, length);
  return this;
}

注解@7 移动writerIndex指针。

小结: 将上面例子的initialCapacity设置成1,促使写入数据时扩充容量。下面运行时截图:array被扩容到64,writerIndex从0位置移动到6.

在写入数据时,判断剩余容量是否足够;不够则需要扩容,如果写入的数据小于4M,则双倍增长,直到容纳写写入的数据。如果写入的数据大于4M,通过(minNewCapacity / threshold * threshold)计算需要扩容的大小。

四、读出数据

从buf中把刚才写入的数据(”瓜农“)读出来,通过工具类ByteBufUtil.readBytes来实现。

代码语言:javascript
复制
ByteBuf readByteBuf = ByteBufUtil.readBytes(UnpooledByteBufAllocator.DEFAULT,buf,str1.getBytes(CharsetUtil.UTF_8).length);
System.out.print(readByteBuf.toString(CharsetUtil.UTF_8));
代码语言:javascript
复制
public static ByteBuf readBytes(ByteBufAllocator alloc, ByteBuf buffer, int length) {
    boolean release = true;
    ByteBuf dst = alloc.buffer(length); // 注解@8
    try {
        buffer.readBytes(dst); // 注解@9
        release = false;
        return dst;
    } finally {
        if (release) {
            dst.release();
        }
    }
}

注解@8 重新构造了一个ByteBuf(dst)用于存储读取的数据

注解@9 读取数据,并移动读索引。

代码语言:javascript
复制
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
    if (checkBounds) {
        if (length > dst.writableBytes()) {
            throw new IndexOutOfBoundsException(String.format(
                    "length(%d) exceeds dst.writableBytes(%d) where dst is: %s", length, dst.writableBytes(), dst));
        }
    }
    readBytes(dst, dst.writerIndex(), length); // 注解@9.1
    dst.writerIndex(dst.writerIndex() + length); // 注解@9.2
    return this;
}

注解@9.1 读取字节到新的ByteBuf。

代码语言:javascript
复制
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length); // 注解@9.1.1
    readerIndex += length; // 注解@9.1.2
    return this;
}

注解@9.1.1 通过native api UNSAFE.copyMemory() 实现byte数组之间的拷贝

注解@9.1.2 源byteBuf读索引readerIndex向前移动

注解@9.2 数据读入新构建的缓存区dst,dst的写索引向前移动

小结: 示例中通过构造一个新的ByteBuf(dst),将源ByteBuf(buf)的数据读入到dst。数据读取结束后,源ByteBuf(buf)readerIndex向前移动;ByteBuf(dst)的writerIndex向前移动。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档