Netty 之 ByteBuf 分析

概述

Netty 中的 ByteBuf 和 NIO 中的 ByteBuffer 的区别。

  • 1、Netty 中的 ByteBuf 支持动态的扩容和缩容。而 NIO 中的 ByteBuffer不支持。
  • 2、Netty 中的 ByteBuf 使用了 readIndex 和 writeIndex 来进行读写操作,而 NIO 中的 ByteBuffer 读写时,还需要 filp()、rewind() 等操作进行读写切换才能读写。
  • 3、Netty 中的 ByteBuf 使用了 markedReaderIndex 和 markedWriterIndex 来进行标记,读写时标记互不影响。而 NIO 中的 ByteBuffer 则只有一个 mark。读写切换时 mark都重置为 -1。
  • 4、Netty 还支持 ByteBuf 的池化技术。 比如: PooledHeapByteBuf PooledUnsafeHeapByteBuf PooledDirectByteBuf PooledUnsafeDirectByteBuf

下面我们简单分析下 Netty 中的 ByteBuf。

writeByte() 方法

Netty 的 ByteBuf 支持动态扩容。在 put 操作的时候会对剩余可用空间进行校验。如果剩余空间不足,则会自动扩容 。NIO 中的 ByteBuffer 不支持动态扩容。 Netty 中的 ByteBuf 还简化 NIO 中的 ByteBuffer 读写问题。 NIO 中 ByteBuffer,使用 postion、limit、capacity、mark 来维护读写操作、每次读写还需要 flip 等操作,而 Netty 中的 ByteBuf 简化了读写操作,用 readerIndex 和 writeIndex 来简化读写操作、读写不需要来回切换。

public ByteBuf writeByte(int value) {
    // 确保 buffer 可写,并且处理扩容、缩容
    ensureWritable0(1);
    // 写入 value,并增加 writerIndex
    _setByte(writerIndex++, value);
    return this;
}
final void ensureWritable0(int minWritableBytes) {
    // 1、判断是否可访问
    ensureAccessible();
    // 2、如果写入字节小于可写字节数,则跳出检查
    if (minWritableBytes <= writableBytes()) {
        return;
    }
    if (checkBounds) {
        // 3、如果写入字节大于 Buffer 剩余可写入量,则抛出异常
        if (minWritableBytes > maxCapacity - writerIndex) {
            throw new IndexOutOfBoundsException(String.format(
                    "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                    writerIndex, minWritableBytes, maxCapacity, this));
        }
    }

    // 4、计算 Buffer 的容量
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // 5、调整 Buffer 的容量大小 (扩大或减小 capacity)
    capacity(newCapacity);
}

1、判断是否可访问 2、如果写入字节小于可写字节数,则跳出检查 3、如果写入字节大于 Buffer 剩余可写入量,则抛出异常 4、计算 Buffer 的容量 5、调整 Buffer 的容量大小 (扩大或减小 capacity)

计算 Buffer 扩容大小

/**
 * minNewCapacity 需要的支持的最小容量(写索引+当前写入字节)
 * maxCapacity 最大容量
 */
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    // 参数合法性校验
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    
    // 容量计算阈值,如果 Buffer 容量小于4M,则每次都扩容2倍,大于4M,则每次扩容4M
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
    // 如果需要写入的容量大小等于 threshold,则直接返回 threshold值。
    if (minNewCapacity == threshold) {
        return threshold;
    }

    // 如果 minNewCapacity 超过 threshold, 就不再每次扩容2倍
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        // 这块逻辑大概意思:需要扩容的最小容量+4M,如果超过 maxCapacity,则把设置为 maxCapacity
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } 
        // 否则,每次扩容4M
        else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // 不超过 threshold 阈值. 则每次扩容两倍,如果小于64的,则直接返回64。
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

这里的主要逻辑为: 1、threshold 设置为 4M,当需要扩容时判断 minNewCapacity 是否等于 threshold ,如果等于则使用 threshold 作为缓冲区大小。 2、如果minNewCapacity 容量大于 threshold 容量,就不能采用每次扩容2倍的方式进行扩容,而是采用每次扩容 4M 的方式。扩容时还需要判断是否达到最大值,如果达到最大值,则使用最大值。 3、如果 minNewCapacity 容量小于 threshold 容量,就成倍的进行扩容,最小容量为64。

调整 Buffer 的容量大小

@Override
public ByteBuf capacity(int newCapacity) {
    // 检查 newCapacity 是否在 0 - maxCapacity 之间
    checkNewCapacity(newCapacity);

    int oldCapacity = array.length;
    byte[] oldArray = array;
    // 如果 newCapacity 大于 oldCapacity 则扩容
    if (newCapacity > oldCapacity) {
        // 创建一个长度为 newCapacity 的 byte[] 数组
        byte[] newArray = allocateArray(newCapacity);
        // 把数据 copy 到新的 byte[] 数组中
        System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
        // newArray 替换原来的 array
        setArray(newArray);
        // UnpooledHeapByteBuf.freeArray() 无操作,空方法。
        freeArray(oldArray);
    } 
    // 如果 newCapacity 小于 oldCapacity 则缩减容量
    else if (newCapacity < oldCapacity) {
        // 创建一个长度为 newCapacity 的 byte[] 数组
        byte[] newArray = allocateArray(newCapacity);
        int readerIndex = readerIndex();
        // 如果读索引小于 Buffer 大小
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            // 写索引大于 newCapacity, 则把写索引设置为 newCapacity
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            // 把数据 copy 到新的 byte[] 数组中
            System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } 
        // 如果读索引大于 newCapacity,则说明无可用的读写。则把读写索引设置为 newCapacity
        else {
            setIndex(newCapacity, newCapacity);
        }
        // newArray 替换原来的 array
        setArray(newArray);
        freeArray(oldArray);
    }
    return this;
}

discardReadBytes

discardReadBytes 就相当于 NIO ByteBuffer 中的 compact() 方法,压缩缓冲区,从而可以在缓冲区中写更多的数据。

调用 discardReadBytes() 方法时需要注意的是,每次 discard 操作,都会发生字节数组的内存复制,所以频繁的调用会导致性能下降,因此在调用之前确认下是否需要每次调用。

  • discardReadBytes() 之前
    +-------------------+------------------+------------------+
    | discardable bytes |  readable bytes  |  writable bytes  |
    +-------------------+------------------+------------------+
    |                   |                  |                  |
    0      <=      readerIndex   <=   writerIndex    <=    capacity
  • discardReadBytes() 之后
   +------------------+--------------------------------------+
   |  readable bytes  |    writable bytes (got more space)   |
   +------------------+--------------------------------------+
   |                  |                                      |
readerIndex (0) <= writerIndex (decreased)        <=        capacity

通过discard前后可用看出,discard后将会有更多的空间可用写入数据。

/**
 * 丢弃掉已经读过的数据,相当于 NIO ByteBuffer.compact() 方法。
 */
@Override
public ByteBuf discardReadBytes() {
    // 判断是否可访问
    ensureAccessible();
    // 如果 readerIndex = 0,说明没有已经读取过的数据,不需要 discard
    if (readerIndex == 0) {
        return this;
    }
    // 如果 readerIndex != writerIndex,则需要调整读写索引,并移动数据
    if (readerIndex != writerIndex) {
        // 复制readerIndex和writerIndex 之间的数据,前移到0的坐标位置。
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        // 修改写索引的值
        writerIndex -= readerIndex;
        // 调整 marker 值
        adjustMarkers(readerIndex);
        // 读索引设置为0
        readerIndex = 0;
    } 
    // 如果读写索引相等,说明没有要读取的数据,则直接把 读写索引置位0即可。相当于清空索引。
    else {
        // 调整 marker 值
        adjustMarkers(readerIndex);
        // 读写索引直接设置为0,即可。
        writerIndex = readerIndex = 0;
    }
    return this;
}

调整 读标记 和 写标记

Netty 的 ByteBuf 使用了 markedReaderIndex 和 markedWriterIndex 来分别代表读写的标记。读写标记会不影响。

protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    // 如果读标记小于discard的大小,读标记设置为0。
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        // 如果写标记小于 decrement,则把写标记置位0,否则为 写标记减去 decrement 
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}

clear() 方法

public ByteBuf clear() {
    // 设置读写索引都为0
    readerIndex = writerIndex = 0;
    return this;
}

只需要把 readerIndex 和 writerIndex 都设置为0即可,而不需要清楚 Buffer 里面的数据。因为再写的时候会覆盖掉。

  • clear() 之前
+-------------------+------------------+------------------+
| discardable bytes |  readable bytes  |  writable bytes  |
+-------------------+------------------+------------------+
|                   |                  |                  |
0      <=      readerIndex   <=   writerIndex    <=    capacity
  • clear() 之后
+---------------------------------------------------------+
|             writable bytes (got more space)             |
+---------------------------------------------------------+
|                                                         |
0 = readerIndex = writerIndex            <=            capacity

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券