本文主要学习ByteBuf 的内存分配以及内存回收。
tiny:总共32个规格, 均是16的整数倍, 0B, 16B, 32B, 48B, 64B, 80B, 96B......496B small:4种规格, 512b, 1k, 2k, 4k nomal:3种规格, 8k, 16k, 32k
PoolThreadCache中维护了三个缓存数组(实际上是六个, 这里仅仅以Direct为例, heap类型的逻辑是一样的): tinySubPageDirectCaches, smallSubPageDirectCaches, 和normalDirectCaches分别代表tiny类型, small类型和normal类型的缓存数组
1. final class PoolThreadCache {
2. private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
3. private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
4. private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
5. private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
6. private final MemoryRegionCache<byte[]>[] normalHeapCaches;
7. private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
8.
9. //省略很多代码 ...............
10.
11. PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
12. int tinyCacheSize, int smallCacheSize, int normalCacheSize,
13. int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
14. checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
15. this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
16. this.heapArena = heapArena;
17. this.directArena = directArena;
18. if (directArena != null) {
19. //初始化tinySubPageDirectCaches、smallSubPageDirectCaches 、normalDirectCaches
20. tinySubPageDirectCaches = createSubPageCaches(
21. tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
22. smallSubPageDirectCaches = createSubPageCaches(
23. smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
24.
25. numShiftsNormalDirect = log2(directArena.pageSize);
26. normalDirectCaches = createNormalCaches(
27. normalCacheSize, maxCachedBufferCapacity, directArena);
28.
29. directArena.numThreadCaches.getAndIncrement();
30. }
31. //省略很多代码 ...............
32. }
下面看一下以tiny类型的内存分配createSubPageCaches方法
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
2. int cacheSize, int numCaches, SizeClass sizeClass) {
3. if (cacheSize > 0 && numCaches > 0) {
4. @SuppressWarnings("unchecked")
5. MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
6. for (int i = 0; i < cache.length; i++) {
7. // TODO: maybe use cacheSize / cache.length
8. cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
9. }
10. return cache;
11. } else {
12. return null;
13. }
14. }
这里创建了一个缓存数组, 这个缓存数组的长度,也就是numCaches, 在不同的类型, 这个长度不一样, tiny类型长度是32, small类型长度为4, normal类型长度为3。
缓存数组中每个节点代表一个缓存对象, 里面维护了一个队列, 队列大小由PooledByteBufAllocator类中的tinyCacheSize, smallCacheSize, normalCacheSize属性决定的。
每个缓存对象, 队列中缓存的ByteBuf大小是固定的, netty将每种缓冲区类型分成了不同长度规格, 而每个缓存中的队列缓存的ByteBuf的长度, 都是同一个规格的长度, 而缓冲区数组的长度, 就是规格的数量。
可能压根你就看不懂,反正我是没看懂,那就看下面的Netty内存分配流程。
摘自于xingdong大佬的文章:
https://www.jianshu.com/p/1ce3bc2d7c5e
首先举一个生活中例子。假如我下了一个单,订购一块 N 字节的内存,并等待它的到达。怎样做到即时送达?
如果订购的内存是个小件(好比一块橡皮、一本书或是一个微波炉等),那么直接从同城仓库送出。 如果订购的内存是个大件(好比电视机、空调等),那么得从区域仓库(例如华东区仓库)送出。 如果订购的内存是个巨大件(好比汽车、轮船),那么得从全国仓库送出。
在 netty 类比以上的物流系统中 同城仓库相当于 PoolThreadCache —— 线程独有的内存仓库; 区域仓库相当于PoolArean —— 几个线程共享的内存仓库; 全国仓库相当于全局变量指向的内存仓库,为所有线程可用。 在 netty 中,整块批发内存,之后或拆开零售,或整块出售。整块批发的内存叫做 chunk,对于小件和大件订单,则进一步拆成 run。 Chunk 的大小为 16MB(可调)或其倍数,如果是堆外内存、那么根据16M对齐;而 run 大小为页大小(8k)的整数倍。
netty中整个内存分配可以用以下图来展示。
源码比较复杂有兴趣的还是你自己照着这图去看比较好点。
堆外内存是不受jvm垃圾回收机制控制的, 所以我们分配一块堆外内存进行ByteBuf操作时, 使用完毕要对对象进行回收。
以PooledUnsafeDirectByteBuf为例:
1.PooledUnsafeDirectByteBuf中内存释放的入口方法是其父类AbstractReferenceCountedByteBuf中的release方法:
1. @Override
2. public boolean release() {
3. return handleRelease(updater.release(this));
4. }
5.
6.
7. private boolean handleRelease(boolean result) {
8. if (result) {
9. deallocate();
10. }
11. return result;
12. }
2.updater.release(this)方法的实际调用如下。
这个方法的工作猜测应该是是判断一下这个ByteBuf 是否可以回收,再三尝试确认是否还有被线程调用的等操作。这我没看懂
,还想大佬们留言解答,网上没有找到。。。
1. public abstract class ReferenceCountUpdater<T extends ReferenceCounted> {
2.
3. public final boolean release(T instance) {
4. //记它被标记回收的次数
5. int rawCnt = nonVolatileRawCnt(instance);
6.
7. return rawCnt == 2 ? tryFinalRelease0(instance, 2) || retryRelease0(instance, 1)
8. : nonFinalRelease0(instance, 1, rawCnt, toLiveRealRefCnt(rawCnt, 1));
9. }
10.
11. private int nonVolatileRawCnt(T instance) {
12. // TODO: Once we compile against later versions of Java we can replace the Unsafe usage here by varhandles.
13. final long offset = unsafeOffset();
14. return offset != -1 ? PlatformDependent.getInt(instance, offset) : updater().get(instance);
15. }
16.
17. private boolean tryFinalRelease0(T instance, int expectRawCnt) {
18. return updater().compareAndSet(instance, expectRawCnt, 1); // any odd number will work
19. }
20.
21. private boolean retryRelease0(T instance, int decrement) {
22. for (;;) {
23. int rawCnt = updater().get(instance), realCnt = toLiveRealRefCnt(rawCnt, decrement);
24. if (decrement == realCnt) {
25. if (tryFinalRelease0(instance, rawCnt)) {
26. return true;
27. }
28. } else if (decrement < realCnt) {
29. // all changes to the raw count are 2x the "real" change
30. if (updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
31. return false;
32. }
33. } else {
34. throw new IllegalReferenceCountException(realCnt, -decrement);
35. }
36. Thread.yield(); // this benefits throughput under high contention
37. }
38. }
39.
40. private boolean nonFinalRelease0(T instance, int decrement, int rawCnt, int realCnt) {
41. if (decrement < realCnt
42. // all changes to the raw count are 2x the "real" change - overflow is OK
43. && updater().compareAndSet(instance, rawCnt, rawCnt - (decrement << 1))) {
44. return false;
45. }
46. return retryRelease0(instance, decrement);
47. }
48.
49.
50. //省略了很多代码。。。
51. }
3.回到第一步的handleRelease判断是否需要回收,回收的基本逻辑如下代码:
this.handle = -1表示当前的ByteBuf不再指向任何一块内存 memory = null这里将memory也设置为null chunk.arena.free(chunk, handle, maxLength, cache)这一步是将ByteBuf的内存进行释放 recycle()是将对象放入的对象回收站, 循环利用
1. abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
2.
3. @Override
4. protected final void deallocate() {
5. if (handle >= 0) {
6. final long handle = this.handle;
7. this.handle = -1;
8. memory = null;
9. chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
10. tmpNioBuf = null;
11. chunk = null;
12. recycle();
13. }
14. }
15. }
4.chunk.arena.free(chunk, handle, maxLength, cache)
这一步是将ByteBuf的内存进行释放.
1. void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
2. if (chunk.unpooled) {
3. int size = chunk.chunkSize();
4. destroyChunk(chunk);
5. activeBytesHuge.add(-size);
6. deallocationsHuge.increment();
7. } else {
8. SizeClass sizeClass = sizeClass(normCapacity);
9. if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
10. // cached so not free it.
11. return;
12. }
13.
14. freeChunk(chunk, handle, sizeClass, nioBuffer, false);
15. }
16. }
5.最后就是调用Recycler中的recycle()
是将对象放入的对象回收站, 循环利用。
1. public abstract class Recycler<T> {
2. @Override
3. public void recycle(Object object) {
4. if (object != value) {
5. throw new IllegalArgumentException("object does not belong to handle");
6. }
7.
8. Stack<?> stack = this.stack;
9. if (lastRecycledId != recycleId || stack == null) {
10. throw new IllegalStateException("recycled already");
11. }
12.
13. stack.push(this);
14. }
15.
16. }
主要学习了ByteBuf 的内存分配以及内存回收。
下回学习Netty的大动脉pipeline
https://www.cnblogs.com/xiangnan6122/p/10202191.html
https://www.jianshu.com/u/fc9c660e9843