【Netty】ByteBuf (二)

本文主要学习ByteBuf 的内存分配以及内存回收

内存规格

tiny:总共32个规格, 均是16的整数倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B small:4种规格, 512b, 1k, 2k, 4k nomal:3种规格, 8k, 16k, 32k

PoolThreadCache中维护了三个缓存数组(实际上是六个, 这里仅仅以Direct为例, heap类型的逻辑是一样的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分别代表tiny类型, small类型和normal类型的缓存数组

1.	final class PoolThreadCache {
2.	    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
3.	    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
4.	    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
5.	    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
6.	    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
7.	    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
8.	
9.	    //省略很多代码 ...............
10.	
11.	 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
12.	                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
13.	                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
14.	        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
15.	        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
16.	        this.heapArena = heapArena;
17.	        this.directArena = directArena;
18.	        if (directArena != null) {
19.	        //初始化tinySubPageDirectCaches、smallSubPageDirectCaches 、normalDirectCaches 
20.	            tinySubPageDirectCaches = createSubPageCaches(
21.	                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
22.	            smallSubPageDirectCaches = createSubPageCaches(
23.	                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
24.	
25.	            numShiftsNormalDirect = log2(directArena.pageSize);
26.	            normalDirectCaches = createNormalCaches(
27.	                    normalCacheSize, maxCachedBufferCapacity, directArena);
28.	
29.	            directArena.numThreadCaches.getAndIncrement();
30.	        } 
31.	    //省略很多代码 ...............
32.	}

下面看一下以tiny类型的内存分配createSubPageCaches方法

   private static <T> MemoryRegionCache<T>[] createSubPageCaches(
2.	            int cacheSize, int numCaches, SizeClass sizeClass) {
3.	        if (cacheSize > 0 && numCaches > 0) {
4.	            @SuppressWarnings("unchecked")
5.	            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
6.	            for (int i = 0; i < cache.length; i++) {
7.	                // TODO: maybe use cacheSize / cache.length
8.	                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
9.	            }
10.	            return cache;
11.	        } else {
12.	            return null;
13.	        }
14.	    }

这里创建了一个缓存数组, 这个缓存数组的长度,也就是numCaches, 在不同的类型, 这个长度不一样, tiny类型长度是32, small类型长度为4, normal类型长度为3。

缓存数组中每个节点代表一个缓存对象, 里面维护了一个队列, 队列大小由PooledByteBufAllocator类中的tinyCacheSize, smallCacheSize, normalCacheSize属性决定的。

每个缓存对象, 队列中缓存的ByteBuf大小是固定的, netty将每种缓冲区类型分成了不同长度规格, 而每个缓存中的队列缓存的ByteBuf的长度, 都是同一个规格的长度, 而缓冲区数组的长度, 就是规格的数量。

可能压根你就看不懂,反正我是没看懂,那就看下面的Netty内存分配流程。

Netty内存分配流程

摘自于xingdong大佬的文章:

https://www.jianshu.com/p/1ce3bc2d7c5e

首先举一个生活中例子。假如我下了一个单,订购一块 N 字节的内存,并等待它的到达。怎样做到即时送达?

如果订购的内存是个小件(好比一块橡皮、一本书或是一个微波炉等),那么直接从同城仓库送出。 如果订购的内存是个大件(好比电视机、空调等),那么得从区域仓库(例如华东区仓库)送出。 如果订购的内存是个巨大件(好比汽车、轮船),那么得从全国仓库送出。

在 netty 类比以上的物流系统中 同城仓库相当于 PoolThreadCache —— 线程独有的内存仓库; 区域仓库相当于PoolArean —— 几个线程共享的内存仓库; 全国仓库相当于全局变量指向的内存仓库,为所有线程可用。 在 netty 中,整块批发内存,之后或拆开零售,或整块出售。整块批发的内存叫做 chunk,对于小件和大件订单,则进一步拆成 run。 Chunk 的大小为 16MB(可调)或其倍数,如果是堆外内存、那么根据16M对齐;而 run 大小为页大小(8k)的整数倍。

netty中整个内存分配可以用以下图来展示。

源码比较复杂有兴趣的还是你自己照着这图去看比较好点。

ByteBuf回收

堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收。

以PooledUnsafeDirectByteBuf为例:

1.PooledUnsafeDirectByteBuf中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf中的release方法:

   1.	@Override
2.	    public boolean release() {
3.	        return handleRelease(updater.release(this));
4.	    }
5.	
6.	
7.	   private boolean handleRelease(boolean result) {
8.	        if (result) {
9.	            deallocate();
10.	        }
11.	        return result;
12.	    }

2.updater.release(this)方法的实际调用如下。

这个方法的工作猜测应该是是判断一下这个ByteBuf 是否可以回收,再三尝试确认是否还有被线程调用的等操作。这我没看懂

,还想大佬们留言解答,网上没有找到。。。

1.	public abstract class ReferenceCountUpdater<T extends ReferenceCounted> {
2.	
3.	    public final boolean release(T instance) {
4.	        //记它被标记回收的次数
5.	        int rawCnt = nonVolatileRawCnt(instance);
6.	
7.	        return rawCnt == 2 ? tryFinalRelease0(instance, 2) || retryRelease0(instance, 1)
8.	                : nonFinalRelease0(instance, 1, rawCnt, toLiveRealRefCnt(rawCnt, 1));
9.	    }
10.	
11.	   private int nonVolatileRawCnt(T instance) {
12.	        // TODO: Once we compile against later versions of Java we can replace the Unsafe usage here by varhandles.
13.	        final long offset = unsafeOffset();
14.	        return offset != -1 ? PlatformDependent.getInt(instance, offset) : updater().get(instance);
15.	    }
16.	
17.	 private boolean tryFinalRelease0(T instance, int expectRawCnt) {
18.	        return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
19.	    }
20.	
21.	 private boolean retryRelease0(T instance, int decrement) {
22.	        for (;;) {
23.	            int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);
24.	            if (decrement == realCnt) {
25.	                if (tryFinalRelease0(instance, rawCnt)) {
26.	                    return true;
27.	                }
28.	            } else if (decrement < realCnt) {
29.	                // all changes to the raw count are 2x the "real" change
30.	                if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
31.	                    return false;
32.	                }
33.	            } else {
34.	                throw new IllegalReferenceCountException(realCnt, -decrement);
35.	            }
36.	            Thread.yield(); // this benefits throughput under high contention
37.	        }
38.	    }
39.	
40.	 private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
41.	        if (decrement < realCnt
42.	                // all changes to the raw count are 2x the "real" change - overflow is OK
43.	                && updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
44.	            return false;
45.	        }
46.	        return retryRelease0(instance, decrement);
47.	    }
48.	
49.	
50.	//省略了很多代码。。。
51.	}

3.回到第一步的handleRelease判断是否需要回收,回收的基本逻辑如下代码:

this.handle = -1表示当前的ByteBuf不再指向任何一块内存 memory = null这里将memory也设置为null chunk.arena.free(chunk, handle, maxLength, cache)这一步是将ByteBuf的内存进行释放 recycle()是将对象放入的对象回收站, 循环利用

1.	abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
2.	
3.	  @Override
4.	    protected final void deallocate() {
5.	        if (handle >= 0) {
6.	            final long handle = this.handle;
7.	            this.handle = -1;
8.	            memory = null;
9.	            chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
10.	            tmpNioBuf = null;
11.	            chunk = null;
12.	            recycle();
13.	        }
14.	    }
15.	}

4.chunk.arena.free(chunk, handle, maxLength, cache)

这一步是将ByteBuf的内存进行释放.

 1.	void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
2.	        if (chunk.unpooled) {
3.	            int size = chunk.chunkSize();
4.	            destroyChunk(chunk);
5.	            activeBytesHuge.add(-size);
6.	            deallocationsHuge.increment();
7.	        } else {
8.	            SizeClass sizeClass = sizeClass(normCapacity);
9.	            if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
10.	                // cached so not free it.
11.	                return;
12.	            }
13.	
14.	            freeChunk(chunk, handle, sizeClass, nioBuffer, false);
15.	        }
16.	    }

5.最后就是调用Recycler中的recycle()

将对象放入的对象回收站, 循环利用。

1.	public abstract class Recycler<T> {
2.	 @Override
3.	        public void recycle(Object object) {
4.	            if (object != value) {
5.	                throw new IllegalArgumentException("object does not belong to handle");
6.	            }
7.	
8.	            Stack<?> stack = this.stack;
9.	            if (lastRecycledId != recycleId || stack == null) {
10.	                throw new IllegalStateException("recycled already");
11.	            }
12.	
13.	            stack.push(this);
14.	        }
15.	
16.	}

总结

主要学习了ByteBuf 的内存分配以及内存回收

下回学习Netty的大动脉pipeline

参考文章

https://www.cnblogs.com/xiangnan6122/p/10202191.html

https://www.jianshu.com/u/fc9c660e9843

本文分享自微信公众号 - 爱编码(ilovecode)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-06-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ryan Miao

Java代码质量检查

又搞一边质量扫描插件,之前做过一遍,然后后面各种忽略,然后就放弃了,所以,应该寻找一种方法,循序渐进的实施。本次将实施一个基本的打包扫描方案,包含

48120
来自专栏Android技术分享

Android APK瘦身全面总结——如何从32.6M到13.6M

之前我简单介绍了关于svg图片瘦身的问题,在公司,瘦身这个问题是我提出来的,所以这锅我背了。公司项目是32.6M,我给自己的要求就是低于20M。上周花了一个星期...

16020
来自专栏程序源代码

微信小程序+商城信息管理系统

a 会员管理 b 会员等级 c 收货地址管理d 会员优惠劵 e 会员收藏 f 会员足迹

96930
来自专栏爱撸猫的杰

Java并发机制的底层实现原理之volatile应用,初学者误看!

  Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码,最终需要转化为汇编指令在CPU上执行,Java中所使用的并发机制...

13420
来自专栏葬爱家族

转角遇到爱_1,你不知道的Java

一时无聊,在简书开一个文集,专门记录一些技术上的偏门知识,少用但有用的知识。也不知道该取什么名字,就叫转角遇到爱吧。

13420
来自专栏happyJared

ArrayList 与 LinkedList 区别

查看源码可以看到, RandomAccess 接口中什么都没有定义。所以,这就是个标识接口,标识那些实现了这个接口的类,具有随机访问的功能。

39420
来自专栏Java程序猿部落

详谈单例、饿汉、和懒汉模式

私有构造函数保证了不能通过构造函数来创建对象实例,只能通过公有静态函数返回唯一的私有静态变量。

9110
来自专栏Java架构

图解 Java 垃圾回收机制,写得非常好!

自动垃圾回收是一种在堆内存中找出哪些对象在被使用,还有哪些对象没被使用,并且将后者删掉的机制。

8920
来自专栏后端技术

volatile 引用

Java - Volatile reference object and its member fields visibility

9330
来自专栏Java架构师学习

何为内存溢出,何为内存泄露

内存泄漏定义(memory leak):一个不再被程序使用的对象或变量还在内存中占有存储空间。

19430

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励