前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Netty】ByteBuf (二)

【Netty】ByteBuf (二)

作者头像
用户3467126
修改2019-07-03 19:45:59
3900
修改2019-07-03 19:45:59
举报
文章被收录于专栏:爱编码爱编码

本文主要学习ByteBuf 的内存分配以及内存回收

内存规格

tiny:总共32个规格, 均是16的整数倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B small:4种规格, 512b, 1k, 2k, 4k nomal:3种规格, 8k, 16k, 32k

PoolThreadCache中维护了三个缓存数组(实际上是六个, 这里仅仅以Direct为例, heap类型的逻辑是一样的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分别代表tiny类型, small类型和normal类型的缓存数组

代码语言:javascript
复制
1.	final class PoolThreadCache {
2.	    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
3.	    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
4.	    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
5.	    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
6.	    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
7.	    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
8.	
9.	    //省略很多代码 ...............
10.	
11.	 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
12.	                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
13.	                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
14.	        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
15.	        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
16.	        this.heapArena = heapArena;
17.	        this.directArena = directArena;
18.	        if (directArena != null) {
19.	        //初始化tinySubPageDirectCaches、smallSubPageDirectCaches 、normalDirectCaches 
20.	            tinySubPageDirectCaches = createSubPageCaches(
21.	                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
22.	            smallSubPageDirectCaches = createSubPageCaches(
23.	                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
24.	
25.	            numShiftsNormalDirect = log2(directArena.pageSize);
26.	            normalDirectCaches = createNormalCaches(
27.	                    normalCacheSize, maxCachedBufferCapacity, directArena);
28.	
29.	            directArena.numThreadCaches.getAndIncrement();
30.	        } 
31.	    //省略很多代码 ...............
32.	}

下面看一下以tiny类型的内存分配createSubPageCaches方法

代码语言:javascript
复制
   private static <T> MemoryRegionCache<T>[] createSubPageCaches(
2.	            int cacheSize, int numCaches, SizeClass sizeClass) {
3.	        if (cacheSize > 0 && numCaches > 0) {
4.	            @SuppressWarnings("unchecked")
5.	            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
6.	            for (int i = 0; i < cache.length; i++) {
7.	                // TODO: maybe use cacheSize / cache.length
8.	                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
9.	            }
10.	            return cache;
11.	        } else {
12.	            return null;
13.	        }
14.	    }

这里创建了一个缓存数组, 这个缓存数组的长度,也就是numCaches, 在不同的类型, 这个长度不一样, tiny类型长度是32, small类型长度为4, normal类型长度为3。

缓存数组中每个节点代表一个缓存对象, 里面维护了一个队列, 队列大小由PooledByteBufAllocator类中的tinyCacheSize, smallCacheSize, normalCacheSize属性决定的。

每个缓存对象, 队列中缓存的ByteBuf大小是固定的, netty将每种缓冲区类型分成了不同长度规格, 而每个缓存中的队列缓存的ByteBuf的长度, 都是同一个规格的长度, 而缓冲区数组的长度, 就是规格的数量。

可能压根你就看不懂,反正我是没看懂,那就看下面的Netty内存分配流程。

Netty内存分配流程

摘自于xingdong大佬的文章:

https://www.jianshu.com/p/1ce3bc2d7c5e

首先举一个生活中例子。假如我下了一个单,订购一块 N 字节的内存,并等待它的到达。怎样做到即时送达?

如果订购的内存是个小件(好比一块橡皮、一本书或是一个微波炉等),那么直接从同城仓库送出。 如果订购的内存是个大件(好比电视机、空调等),那么得从区域仓库(例如华东区仓库)送出。 如果订购的内存是个巨大件(好比汽车、轮船),那么得从全国仓库送出。

在 netty 类比以上的物流系统中 同城仓库相当于 PoolThreadCache —— 线程独有的内存仓库; 区域仓库相当于PoolArean —— 几个线程共享的内存仓库; 全国仓库相当于全局变量指向的内存仓库,为所有线程可用。 在 netty 中,整块批发内存,之后或拆开零售,或整块出售。整块批发的内存叫做 chunk,对于小件和大件订单,则进一步拆成 run。 Chunk 的大小为 16MB(可调)或其倍数,如果是堆外内存、那么根据16M对齐;而 run 大小为页大小(8k)的整数倍。

netty中整个内存分配可以用以下图来展示。

源码比较复杂有兴趣的还是你自己照着这图去看比较好点。

ByteBuf回收

堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收。

以PooledUnsafeDirectByteBuf为例:

1.PooledUnsafeDirectByteBuf中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf中的release方法:

代码语言:javascript
复制
   1.	@Override
2.	    public boolean release() {
3.	        return handleRelease(updater.release(this));
4.	    }
5.	
6.	
7.	   private boolean handleRelease(boolean result) {
8.	        if (result) {
9.	            deallocate();
10.	        }
11.	        return result;
12.	    }

2.updater.release(this)方法的实际调用如下。

这个方法的工作猜测应该是是判断一下这个ByteBuf 是否可以回收,再三尝试确认是否还有被线程调用的等操作。这我没看懂

,还想大佬们留言解答,网上没有找到。。。

代码语言:javascript
复制
1.	public abstract class ReferenceCountUpdater<T extends ReferenceCounted> {
2.	
3.	    public final boolean release(T instance) {
4.	        //记它被标记回收的次数
5.	        int rawCnt = nonVolatileRawCnt(instance);
6.	
7.	        return rawCnt == 2 ? tryFinalRelease0(instance, 2) || retryRelease0(instance, 1)
8.	                : nonFinalRelease0(instance, 1, rawCnt, toLiveRealRefCnt(rawCnt, 1));
9.	    }
10.	
11.	   private int nonVolatileRawCnt(T instance) {
12.	        // TODO: Once we compile against later versions of Java we can replace the Unsafe usage here by varhandles.
13.	        final long offset = unsafeOffset();
14.	        return offset != -1 ? PlatformDependent.getInt(instance, offset) : updater().get(instance);
15.	    }
16.	
17.	 private boolean tryFinalRelease0(T instance, int expectRawCnt) {
18.	        return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
19.	    }
20.	
21.	 private boolean retryRelease0(T instance, int decrement) {
22.	        for (;;) {
23.	            int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);
24.	            if (decrement == realCnt) {
25.	                if (tryFinalRelease0(instance, rawCnt)) {
26.	                    return true;
27.	                }
28.	            } else if (decrement < realCnt) {
29.	                // all changes to the raw count are 2x the "real" change
30.	                if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
31.	                    return false;
32.	                }
33.	            } else {
34.	                throw new IllegalReferenceCountException(realCnt, -decrement);
35.	            }
36.	            Thread.yield(); // this benefits throughput under high contention
37.	        }
38.	    }
39.	
40.	 private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
41.	        if (decrement < realCnt
42.	                // all changes to the raw count are 2x the "real" change - overflow is OK
43.	                && updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
44.	            return false;
45.	        }
46.	        return retryRelease0(instance, decrement);
47.	    }
48.	
49.	
50.	//省略了很多代码。。。
51.	}

3.回到第一步的handleRelease判断是否需要回收,回收的基本逻辑如下代码:

this.handle = -1表示当前的ByteBuf不再指向任何一块内存 memory = null这里将memory也设置为null chunk.arena.free(chunk, handle, maxLength, cache)这一步是将ByteBuf的内存进行释放 recycle()是将对象放入的对象回收站, 循环利用

代码语言:javascript
复制
1.	abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
2.	
3.	  @Override
4.	    protected final void deallocate() {
5.	        if (handle >= 0) {
6.	            final long handle = this.handle;
7.	            this.handle = -1;
8.	            memory = null;
9.	            chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
10.	            tmpNioBuf = null;
11.	            chunk = null;
12.	            recycle();
13.	        }
14.	    }
15.	}

4.chunk.arena.free(chunk, handle, maxLength, cache)

这一步是将ByteBuf的内存进行释放.

代码语言:javascript
复制
 1.	void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
2.	        if (chunk.unpooled) {
3.	            int size = chunk.chunkSize();
4.	            destroyChunk(chunk);
5.	            activeBytesHuge.add(-size);
6.	            deallocationsHuge.increment();
7.	        } else {
8.	            SizeClass sizeClass = sizeClass(normCapacity);
9.	            if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
10.	                // cached so not free it.
11.	                return;
12.	            }
13.	
14.	            freeChunk(chunk, handle, sizeClass, nioBuffer, false);
15.	        }
16.	    }

5.最后就是调用Recycler中的recycle()

将对象放入的对象回收站, 循环利用。

代码语言:javascript
复制
1.	public abstract class Recycler<T> {
2.	 @Override
3.	        public void recycle(Object object) {
4.	            if (object != value) {
5.	                throw new IllegalArgumentException("object does not belong to handle");
6.	            }
7.	
8.	            Stack<?> stack = this.stack;
9.	            if (lastRecycledId != recycleId || stack == null) {
10.	                throw new IllegalStateException("recycled already");
11.	            }
12.	
13.	            stack.push(this);
14.	        }
15.	
16.	}

总结

主要学习了ByteBuf 的内存分配以及内存回收

下回学习Netty的大动脉pipeline

参考文章

https://www.cnblogs.com/xiangnan6122/p/10202191.html

https://www.jianshu.com/u/fc9c660e9843

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内存规格
  • Netty内存分配流程
  • ByteBuf回收
  • 总结
  • 参考文章
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档