前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】ByteBuf (一)

【Netty】ByteBuf (一)

作者头像
用户3467126
发布2019-07-03 18:09:28
7330
发布2019-07-03 18:09:28
举报
文章被收录于专栏:爱编码爱编码爱编码

简介

所有的网路通信都涉及字节序列的移动,所以高效易用的数据结构明显是必不可少的。Netty的ByteBuf实现满足并超越了这些需求。

ByteBuf结构

ByteBuf维护了两个不同的索引:一个是用于读取,一个用于写入。当你从ByteBuf读取是,它的 readerIndex将会被递增已经被读取的字节数。同样地,当你写入ByteBuf时,它的 witerIndex也会被递增。

作为一个容器,源码中的如下。有三块区域

discardable bytes:无效空间(已经读取过的空间),可丢弃字节的区域,由readerIndex指针控制

readable bytes:内容空间,可读字节的区域,由readerIndex和writerIndex指针控制

writable bytes:空闲空间,可写入字节的区域,由writerIndex指针和capacity容量控制

 * <pre> *      +-------------------+------------------+------------------+ *      | discardable bytes |  readable bytes  |  writable bytes  | *      |                   |     (CONTENT)    |                  | *      +-------------------+------------------+------------------+ *      |                   |                  |                  | *      0      <=      readerIndex   <=   writerIndex    <=    capacity * </pre>

ByteBuf使用模式

总体分类划分是可根据JVM堆内存来区分的。

1.堆内内存(JVM堆空间内) 2.堆外内存(本机直接内存) 3.复合缓冲区(以上2种缓冲区多个混合)

1.堆内内存

最常用的ByteBuf模式是将数据存储在JVM的堆空间中。它能在没有使用池化的情况下提供快速的分配和释放。

2.堆外内存

JDK允许JVM实现通过本地调用来分配内存。主要是为了避免每次调用本地I/O操作之前(或者之后)将缓冲区的内容复制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区)。

最大的特点:它的内容将驻留在常规的会被垃圾回收的堆之外。 最大的缺点:相对于堆缓冲区,它的分配和释放都是较为昂贵的。

3.复合缓冲区

常用类:CompositeByteBuf,它为多个ByteBuf提供一个聚合视图,将多个缓冲区表示为单个合并缓冲区的虚拟表示。

比如:HTTP协议:头部和主体这两部分由应用程序的不同模块产生。这个时候把这两部分合并的话,选择CompositeByteBuf是比较好的。

ByteBuf分类

主要分为三大类

Pooled和Unpooled (池化) unsafe和非unsafe Heap和Direct (堆内和堆外)

Pooled和Unpooled

Pooled:每次都从预先分配好的内存中去取出一段连续内存封装成一个ByteBuf给应用程序使用 Unpooled:每次分配内存的时候,直接调用系统api,向操作系统申请一 块内存

Heap和Direct:

Head:是调用jvm的堆内存进行分配的,需要被gc进行管理 Direct:是调用jdk的api进行内存分配,不受jvm控制,不会参与到gc的过程

Unsafe和非Unsafe

jdk中有Unsafe对象可以直接拿到对象的内存地址,并且基于这个内存地址进行读写操作。那么对应的分类的区别就是是否可以拿到jdk底层的Unsafe进行读写操作了。

Java为什么会引入及如何使用Unsafe

内存分配ByteBufAllocator

这个接口实现负责分配缓冲区并且是线程安全的。从下面的接口方法以及注释可以总结出主要是围绕上面的三种ByteBuf内存模式:堆内,堆外以及复合型的内存分配。

/** * Implementations are responsible to allocate buffers. Implementations of this interface are expected to be * thread-safe. */public interface ByteBufAllocator {
    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
    /**     * Allocate a {@link ByteBuf}. If it is a direct or heap buffer     * depends on the actual implementation.     */    ByteBuf buffer();
    /**     * Allocate a {@link ByteBuf} with the given initial capacity.     * If it is a direct or heap buffer depends on the actual implementation.     */    ByteBuf buffer(int initialCapacity);
    /**     * Allocate a {@link ByteBuf} with the given initial capacity and the given     * maximal capacity. If it is a direct or heap buffer depends on the actual     * implementation.     */    ByteBuf buffer(int initialCapacity, int maxCapacity);
    /**     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.     */    ByteBuf ioBuffer();
    /**     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.     */    ByteBuf ioBuffer(int initialCapacity);
    /**     * Allocate a {@link ByteBuf}, preferably a direct buffer which is suitable for I/O.     */    ByteBuf ioBuffer(int initialCapacity, int maxCapacity);
    /**     * Allocate a heap {@link ByteBuf}.     */    ByteBuf heapBuffer();
    /**     * Allocate a heap {@link ByteBuf} with the given initial capacity.     */    ByteBuf heapBuffer(int initialCapacity);
    /**     * Allocate a heap {@link ByteBuf} with the given initial capacity and the given     * maximal capacity.     */    ByteBuf heapBuffer(int initialCapacity, int maxCapacity);
    /**     * Allocate a direct {@link ByteBuf}.     */    ByteBuf directBuffer();
    /**     * Allocate a direct {@link ByteBuf} with the given initial capacity.     */    ByteBuf directBuffer(int initialCapacity);
    /**     * Allocate a direct {@link ByteBuf} with the given initial capacity and the given     * maximal capacity.     */    ByteBuf directBuffer(int initialCapacity, int maxCapacity);
    /**     * Allocate a {@link CompositeByteBuf}.     * If it is a direct or heap buffer depends on the actual implementation.     */    CompositeByteBuf compositeBuffer();
    /**     * Allocate a {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.     * If it is a direct or heap buffer depends on the actual implementation.     */    CompositeByteBuf compositeBuffer(int maxNumComponents);
    /**     * Allocate a heap {@link CompositeByteBuf}.     */    CompositeByteBuf compositeHeapBuffer();
    /**     * Allocate a heap {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.     */    CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
    /**     * Allocate a direct {@link CompositeByteBuf}.     */    CompositeByteBuf compositeDirectBuffer();
    /**     * Allocate a direct {@link CompositeByteBuf} with the given maximum number of components that can be stored in it.     */    CompositeByteBuf compositeDirectBuffer(int maxNumComponents);
    /**     * Returns {@code true} if direct {@link ByteBuf}'s are pooled     */    boolean isDirectBufferPooled();
    /**     * Calculate the new capacity of a {@link ByteBuf} that is used when a {@link ByteBuf} needs to expand by the     * {@code minNewCapacity} with {@code maxCapacity} as upper-bound.     */    int calculateNewCapacity(int minNewCapacity, int maxCapacity); }

其中ByteBufAllocator 的具体实现可以查看其子类,如下图

下面来看看各自子类的功能以及区别

UnpooledByteBufAllocator

heap内存分配

入口 newInstrumentedUnpooledUnsafeHeapByteBuf(this,initialCapacity,maxCapacity) : 发现分配 Unpooled、Unsafe、Heap内存,其实是分配了一个byte数组,并保存在 UnpooledHeapByteBuf#array成员变量中。该内存的初始值容量和最大可扩展容量可以指定。

   public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {        super(maxCapacity);
        checkNotNull(alloc, "alloc");
        if (initialCapacity > maxCapacity) {            throw new IllegalArgumentException(String.format(                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));        }
        this.alloc = alloc;        setArray(allocateArray(initialCapacity));        setIndex(0, 0);    }

    protected byte[] allocateArray(int initialCapacity) {        return new byte[initialCapacity];    }

查看 UnpooledHeapByteBuf#getByte()方法,堆内存类型的ByteBuf获取的时候。直接通过下标获取byte数组中的byte

 @Override    public byte getByte(int index) {        ensureAccessible();        return _getByte(index);    }
   @Override    protected byte _getByte(int index) {        //该array为初始化的时候,实例化的byte[]        return HeapByteBufUtil.getByte(array, index);    }
    static byte getByte(byte[] memory, int index) {        //直接拿到一个数组        return memory[index];    }

direct内存分配

入口 UnpooledByteBufAllocator#newDirectBuffer() --> UnpooledUnsafeDirectByteBuf

  public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {        super(maxCapacity);        if (alloc == null) {            throw new NullPointerException("alloc");        }        checkPositiveOrZero(initialCapacity, "initialCapacity");        checkPositiveOrZero(maxCapacity, "maxCapacity");        if (initialCapacity > maxCapacity) {            throw new IllegalArgumentException(String.format(                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));        }
        this.alloc = alloc;        setByteBuffer(allocateDirect(initialCapacity), false);    }

  protected ByteBuffer allocateDirect(int initialCapacity) {        return ByteBuffer.allocateDirect(initialCapacity);    }
  final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {        if (tryFree) {            ByteBuffer oldBuffer = this.buffer;            if (oldBuffer != null) {                if (doNotFree) {                    doNotFree = false;                } else {                    freeDirect(oldBuffer);                }            }        }        this.buffer = buffer;        memoryAddress = PlatformDependent.directBufferAddress(buffer);        tmpNioBuf = null;        capacity = buffer.remaining();    }

可以发现,Unpooled、Direct类型得内存分配实际上是维护了一个底层jdk的一个DirectByteBuffer。分配内存的时候就创建它,并将他保存到buffer成员变量。

跟踪 iUnpooledHeapByteBuf#_getByte(),就比较简单了,直接使用jdk的api获取

 @Override    protected byte _getByte(int index) {        //使用buffer        return buffer.get(index);    }

更加详细的分析可以查看下面这篇文章 https://www.jianshu.com/p/1585e32cf6b4

PooledByteBufAllocator

入口 PooledByteBufAllocator#newHeapBuffer()PooledByteBufAllocator#newDirectBuffer(),堆内内存和堆外内存分配的模式都比较固定。

1.拿到线程局部缓存PoolThreadCache 2.拿到不同类型的rena 3.使用不同类型的arena进行内存分配

@Override    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {        //拿到线程局部缓存        PoolThreadCache cache = threadCache.get();        //拿到heapArena        PoolArena<byte[]> heapArena = cache.heapArena;
        final ByteBuf buf;        if (heapArena != null) {            //使用heapArena分配内存            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);        } else {            buf = PlatformDependent.hasUnsafe() ?                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);        }
        return toLeakAwareBuffer(buf);    }
    @Override    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {        //拿到线程局部缓存        PoolThreadCache cache = threadCache.get();        //拿到directArena        PoolArena<ByteBuffer> directArena = cache.directArena;
        final ByteBuf buf;        if (directArena != null) {            //使用directArena分配内存            buf = directArena.allocate(cache, initialCapacity, maxCapacity);        } else {            buf = PlatformDependent.hasUnsafe() ?                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);        }
        return toLeakAwareBuffer(buf);    }

跟踪 threadCache.get()调用的是 FastThreadLocal#get()方法。那么其实threadCache也是一个FastThreadLocal,可以看成是jdk的ThreadLocal,get方法调用了初始化方法 initializel

public final V get() {        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();        Object v = threadLocalMap.indexedVariable(index);        if (v != InternalThreadLocalMap.UNSET) {            return (V) v;        }        //调用初始化方法        V value = initialize(threadLocalMap);        registerCleaner(threadLocalMap);        return value;    }

initialValue()方法的逻辑如下

  1. 从预先准备好的 heapArenasdirectArenas中获取最少使用的 arena
  2. 使用获取到的 arean为参数,实例化一个 PoolThreadCache并返回
   final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {        private final boolean useCacheForAllThreads;
        PoolThreadLocalCache(boolean useCacheForAllThreads) {            this.useCacheForAllThreads = useCacheForAllThreads;        }
        @Override        protected synchronized PoolThreadCache initialValue() {            /**             * arena翻译成竞技场,关于内存非配的逻辑都在这个竞技场中进行分配             */            //获取heapArena:从heapArenas堆内竞技场中拿出使用最少的一个arena            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);            //获取directArena:从directArena堆内竞技场中拿出使用最少的一个arena            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
            Thread current = Thread.currentThread();            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {                //创建PoolThreadCache:该Cache最终被一个线程使用                //通过heapArena和directArena维护两大块内存:堆和堆外内存                //通过tinyCacheSize,smallCacheSize,normalCacheSize维护ByteBuf缓存列表维护反复使用的内存块                return new PoolThreadCache(                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);            }            // No caching so just use 0 as sizes.            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);        }
      //省略代码......
      }

查看 PoolThreadCache其维护了两种类型的内存分配策略,一种是上述通过持有 heapArenadirectArena,另一种是通过维护 tiny,small,normal对应的缓存列表来维护反复使用的内存。

final class PoolThreadCache {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
    //通过arena的方式维护内存    final PoolArena<byte[]> heapArena;    final PoolArena<ByteBuffer> directArena;
    //维护了tiny, small, normal三种类型的缓存列表    // Hold the caches for the different size classes, which are tiny, small and normal.    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;    private final MemoryRegionCache<byte[]>[] normalHeapCaches;    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    // Used for bitshifting when calculate the index of normal caches later    private final int numShiftsNormalDirect;    private final int numShiftsNormalHeap;    private final int freeSweepAllocationThreshold;    private final AtomicBoolean freed = new AtomicBoolean();
    private int allocations;
    // TODO: Test if adding padding helps under contention    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        //通过持有heapArena和directArena,arena的方式管理内存分配        this.heapArena = heapArena;        this.directArena = directArena;
        //通过tinyCacheSize,smallCacheSize,normalCacheSize创建不同类型的缓存列表并保存到成员变量        if (directArena != null) {            tinySubPageDirectCaches = createSubPageCaches(                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);            smallSubPageDirectCaches = createSubPageCaches(                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
            numShiftsNormalDirect = log2(directArena.pageSize);            normalDirectCaches = createNormalCaches(                    normalCacheSize, maxCachedBufferCapacity, directArena);
            directArena.numThreadCaches.getAndIncrement();        } else {            // No directArea is configured so just null out all caches            tinySubPageDirectCaches = null;            smallSubPageDirectCaches = null;            normalDirectCaches = null;            numShiftsNormalDirect = -1;        }        if (heapArena != null) {            // Create the caches for the heap allocations            //创建规格化缓存队列            tinySubPageHeapCaches = createSubPageCaches(                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);            //创建规格化缓存队列            smallSubPageHeapCaches = createSubPageCaches(                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
            numShiftsNormalHeap = log2(heapArena.pageSize);            //创建规格化缓存队列            normalHeapCaches = createNormalCaches(                    normalCacheSize, maxCachedBufferCapacity, heapArena);
            heapArena.numThreadCaches.getAndIncrement();        } else {            // No heapArea is configured so just null out all caches            tinySubPageHeapCaches = null;            smallSubPageHeapCaches = null;            normalHeapCaches = null;            numShiftsNormalHeap = -1;        }
        // Only check if there are caches in use.        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)                && freeSweepAllocationThreshold < 1) {            throw new IllegalArgumentException("freeSweepAllocationThreshold: "                    + freeSweepAllocationThreshold + " (expected: > 0)");        }    }
    private static <T> MemoryRegionCache<T>[] createSubPageCaches(            int cacheSize, int numCaches, SizeClass sizeClass) {        if (cacheSize > 0 && numCaches > 0) {            //MemoryRegionCache 维护缓存的一个对象            @SuppressWarnings("unchecked")            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];            for (int i = 0; i < cache.length; i++) {                // TODO: maybe use cacheSize / cache.length                //每一种MemoryRegionCache(tiny,small,normal)都表示不同内存大小(不同规格)的一个队列                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);            }            return cache;        } else {            return null;        }    }
    private static <T> MemoryRegionCache<T>[] createNormalCaches(            int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {        if (cacheSize > 0 && maxCachedBufferCapacity > 0) {            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);            //MemoryRegionCache 维护缓存的一个对象            @SuppressWarnings("unchecked")            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];            for (int i = 0; i < cache.length; i++) {                //每一种MemoryRegionCache(tiny,small,normal)都表示不同内存(不同规格)大小的一个队列                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);            }            return cache;        } else {            return null;        }    }
......}

更加详细分析可参考以下文章 https://www.jianshu.com/p/1cd65dae358c

directArena分配direct内存的流程

上一步拿到PoolThreadCache之后,获取对应的 Arena。那么之后就是Arena具体分配内存的步骤。

入口 PooledByteBufAllocator#newDirectBuffer()方法种有如下代码

 PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {        //拿到PooledByteBuf对象,仅仅是一个对象        PooledByteBuf<T> buf = newByteBuf(maxCapacity);        //从cache种分配内存,并初始化buf种内存地址相关的属性        allocate(cache, buf, reqCapacity);        return buf;    }

可以看到分配的过程如下:拿到PooledByteBuf对象从cache中分配内存,并重置相关属性.

1. newByteBuf(maxCapacity);拿到PooledByteBuf对象

  @Override        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {            if (HAS_UNSAFE) {                //获取一个PooledByteBuf                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);            } else {                return PooledDirectByteBuf.newInstance(maxCapacity);            }        }
    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {        //从带有回收特性的对象池RECYCLER获取一个PooledUnsafeDirectByteBuf        PooledUnsafeDirectByteBuf buf = RECYCLER.get();        //buf可能是从回收站拿出来的,要进行复用        buf.reuse(maxCapacity);        return buf;    }

2.Recycler是一个基于线程本地堆栈的对象池。Recycler维护了一个ThreadLocal成员变量,用于返回一个stack给回收处理 DefaultHandle,该处理器通过维护这个堆栈来维护 PooledUnsafeDirectByteBuf缓存。

private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {        @Override        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {            //Recycler负责用回收处理器handler维护PooledUnsafeDirectByteBuf            //handler底层持有一个stack作为对象池,维护对象池,handle同时负责对象回收            //存储handler为成员变量,使用完该ByteBuf可以调用回收方法回收            return new PooledUnsafeDirectByteBuf(handle, 0);        }    };
//维护了一个`ThreadLocal`,`initialValue`方法返回一个堆栈。    private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {        @Override        protected Stack<T> initialValue() {            return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,                    ratioMask, maxDelayedQueuesPerThread);        }
        @Override        protected void onRemoval(Stack<T> value) {            // Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead            if (value.threadRef.get() == Thread.currentThread()) {               if (DELAYED_RECYCLED.isSet()) {                   DELAYED_RECYCLED.get().remove(value);               }            }        }    };

3.再看Recycler#get()方法

  public final T get() {        if (maxCapacityPerThread == 0) {            return newObject((Handle<T>) NOOP_HANDLE);        }        //获取对应的堆栈,相当一个回收站        Stack<T> stack = threadLocal.get();
        //从栈顶拿出一个来DefaultHandle(回收处理器)        //DefaultHandle持有一个value,其实是PooledUnsafeDirectByteBuf        DefaultHandle<T> handle = stack.pop();        //没有回收处理器,说明没有闲置的ByteBuf        if (handle == null) {            //新增一个处理器            handle = stack.newHandle();
            //回调,还记得么?该回调返回一个PooledUnsafeDirectByteBuf            //让处理器持有一个新的PooledUnsafeDirectByteBuf            handle.value = newObject(handle);        }        //如果有,则可直接重复使用        return (T) handle.value;    }
    public final V get() {        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();        Object v = threadLocalMap.indexedVariable(index);        if (v != InternalThreadLocalMap.UNSET) {            return (V) v;        }        //回调initialize        V value = initialize(threadLocalMap);        registerCleaner(threadLocalMap);        return value;    }
        private V initialize(InternalThreadLocalMap threadLocalMap) {        V v = null;        try {            //回调            v = initialValue();        } catch (Exception e) {            PlatformDependent.throwException(e);        }
        threadLocalMap.setIndexedVariable(index, v);        addToVariablesToRemove(threadLocalMap, this);        return v;    }

        DefaultHandle<T> newHandle() {            //实例化一个处理器并并且初四话成员变量,该成员变量stack从threalocal中初始化            return new DefaultHandle<T>(this);        }

4.DefaultHandle用 stack作为缓存池维护 PooledUnsafeDirectByteBuf,同理 PooledDirectByteBuf也是一样的。只不过实例化的对象的实现不一样而已。同时,处理器定义了回收的方法是将兑现存回栈内,使用的时候则是从栈顶取出。

static final class DefaultHandle<T> implements Handle<T> {        private int lastRecycledId;        private int recycleId;
        boolean hasBeenRecycled;        //对象缓存池        private Stack<?> stack;        private Object value;
        DefaultHandle(Stack<?> stack) {            this.stack = stack;        }
        /**         * 定义回收方法,回收对象到stack         * @param object         */        @Override        public void recycle(Object object) {            if (object != value) {                throw new IllegalArgumentException("object does not belong to handle");            }
            Stack<?> stack = this.stack;            if (lastRecycledId != recycleId || stack == null) {                throw new IllegalStateException("recycled already");            }            //回收:将自己存进栈中缓存起来            stack.push(this);        }    }

5.到这我们刚刚看完第一步,到第二步重置缓存内指针的时候了 ,获取到PooledUnsafeDirectByteBuf的时候,有可能是从缓存中取出来的。因此需要复用.

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {        //从带有回收特性的对象池RECYCLER获取一个PooledUnsafeDirectByteBuf        PooledUnsafeDirectByteBuf buf = RECYCLER.get();        //buf可能是从回收站拿出来的,要进行复用        buf.reuse(maxCapacity);        return buf;    }
    final void reuse(int maxCapacity) {        //重置最大容量        maxCapacity(maxCapacity);        //设置引用        setRefCnt(1);        //重置指针        setIndex0(0, 0);        //重置标记值        discardMarks();    }

6.到这才刚刚完成分配内存的第一步(拿到PooledByteBuf对象),以上都是仅仅是获取并且用回收站和回收处理器管理这些对象,这些对象仍然只是一个对象,还没有分配实际的内存。

7.跟踪 PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)

  private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {        final int normCapacity = normalizeCapacity(reqCapacity);
        //不同的规格大小进行内存分配        /**         * 分配整体逻辑(先判断tiny和small规格的,再判断normal规格的)         * 1. 尝试从缓存上进行内存分配,成功则返回         * 2. 失败则再从内存堆中进行分配内存         */        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize            int tableIdx;            PoolSubpage<T>[] table;            boolean tiny = isTiny(normCapacity);
            //尝试tiny和small规格的缓存内存分配            if (tiny) { // < 512                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {                    // was able to allocate out of the cache so move on                    return;                }                tableIdx = tinyIdx(normCapacity);                table = tinySubpagePools;            } else {                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {                    // was able to allocate out of the cache so move on                    return;                }                tableIdx = smallIdx(normCapacity);                table = smallSubpagePools;            }
            final PoolSubpage<T> head = table[tableIdx];
            /**             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.             */            synchronized (head) {                final PoolSubpage<T> s = head.next;                if (s != head) {                    assert s.doNotDestroy && s.elemSize == normCapacity;                    long handle = s.allocate();                    assert handle >= 0;                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);                    incTinySmallAllocation(tiny);                    return;                }            }            //tiny和small规格的缓存内存分配尝试失败            //从内存堆中分配内存            synchronized (this) {                allocateNormal(buf, reqCapacity, normCapacity);            }
            incTinySmallAllocation(tiny);            return;        }        //normal规格        //如果分配处出来的内存大于一个值(chunkSize),则执行allocateHuge        if (normCapacity <= chunkSize) {            //从缓存上进行内存分配            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {                // was able to allocate out of the cache so move on                return;            }            //缓存没有再从内存堆中分配内存            synchronized (this) {                allocateNormal(buf, reqCapacity, normCapacity);                ++allocationsNormal;            }        } else {            // Huge allocations are never served via the cache so just call allocateHuge            allocateHuge(buf, reqCapacity);        }    }

其整体分配内存的逻辑是根据不同规格大小的内存需要来的,显示 tinysmall规格的,再是 normal规格的。分配也是先尝试从缓存中进行内存分配,如果分配失败再从内存堆中进行内存分配。 当然,分配出来的内存回和第一步拿到的PooledByteBuf进行绑定起来。

总结

主要学习了ByteBuf 的基本结构、使用模式、分类、基本的内存分配

下次再学习ByteBuf 的命中逻辑以及内存回收

参考文章

https://www.cnblogs.com/xiangnan6122/p/10202191.html https://www.jianshu.com/u/fc9c660e9843

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ByteBuf结构
  • ByteBuf使用模式
    • 1.堆内内存
      • 2.堆外内存
        • 3.复合缓冲区
        • ByteBuf分类
        • 内存分配ByteBufAllocator
          • UnpooledByteBufAllocator
            • PooledByteBufAllocator
              • directArena分配direct内存的流程
              • 总结
              • 参考文章
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档