前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java NIO实现原理之Buffer

Java NIO实现原理之Buffer

作者头像
Monica2333
发布2020-06-19 17:52:09
4840
发布2020-06-19 17:52:09
举报
文章被收录于专栏:码农知识点码农知识点

nio是基于事件驱动模型的非阻塞io,这篇文章简要介绍了nio,本篇主要介绍Buffer的实现原理。

Buffer

是一块缓冲区,通常使用buffer读写数据为:

代码语言:javascript
复制
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

//1.create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
//2.write into buffer
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
//3.make buffer from write mode to read mode
  buf.flip();  

  while(buf.hasRemaining()){
    //4. read 1 byte from buffer
      System.out.print((char) buf.get()); 
  }
//5.调用clear()方法或者compact()方法,make buffer ready for writing
  buf.clear(); 
  bytesRead = inChannel.read(buf);
}
aFile.close();

Buffer的数据结构设计如下:

Buffer数据结构.png

其中:

capacity:buffer的固定大小值

position:在写模式下,表示当前写入数据的位置。在读模式下,表示当前已读到数据的位置

limit:在写模式下,表示最大可写的位置,为capacity ,在读模式下,表示最大可读位置。

此外,Buffer类中还有以下参数:

mark:初始值为-1,用于备份当前的position。

address:buffer对象持有的堆外内存(DirectByteBuffer)的内存地址,方便JNI 快速找到堆外内存地址。

Buffer相关的类结构如下:

Buffer类结构.png

Buffer或ByteBuffer的方法简介:

1.Buffer的分配:

代码语言:javascript
复制
//分配一份堆内内存
public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }
//分配一份堆外内存
public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }

2.向buffer写入数据:

代码语言:javascript
复制
//各种put方法,也可从channel中读取,inChannel.read(buf)
put(....)

3.从buffer中读取数据

代码语言:javascript
复制
//也可以这样写入channel。 int bytesWritten = inChannel.write(buf);
get()

4.flip():将Buffer从写模式切换到读模式

代码语言:javascript
复制
public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

5.rewind():将position设回0,所以你可以重读Buffer中的所有数据

代码语言:javascript
复制
public final Buffer rewind() {
        position = 0;
        mark = -1;
        return this;
 }

6.clear()与compact():一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成。

compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

代码语言:javascript
复制
 public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

7.mark()与reset()

通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。方便数据的重新读取,而流只能是单向读取。

ByteBuffer的两种实现:

HeapByteBuffer:Java中分配的非空对象都是由Java虚拟机的垃圾收集器管理的,也称为堆内内存(on-heap memory)。虚拟机会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着一个重要的事实——这样一次垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。JVM参数中 -Xmx的值是新生代和老生代的和的最大值,我们在jvm参数里通常还会加一个参数-XX:MaxPermSize来指定持久代的最大值,那么我们认识的Java堆的最大值其实是-Xmx和-XX:MaxPermSize的总和,在分代算法下,新生代,老生代和持久代是连续的虚拟地址,因为它们是一起分配的。

DirectByteBuffer:由该对象创建的在jvm之外的内存,对于生命期中等或较长的对象,正是堆外内存要解决的。堆外内存有以下特点:

对于大内存有良好的伸缩性

对垃圾回收停顿的改善可以明显感觉到

在进程间可以共享,减少虚拟机间的复制

接下来看一下DirectByteBuffer的实现:

代码语言:javascript
复制
 DirectByteBuffer(int cap) {                   // package-private

        super(-1, 0, cap, cap);
        boolean pa = VM.isDirectMemoryPageAligned();
        int ps = Bits.pageSize();
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
       //确认堆外内存是否够用
        Bits.reserveMemory(size, cap);

        long base = 0;
        try {
           //分配堆外内存,并返回堆外内存的基地址
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        if (pa && (base % ps != 0)) {
            // Round up to page boundary
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;



    }

Bits.reserveMemory(size, cap) 方法,该方法用于在系统中保存总分配内存(按页分配)的大小和实际内存的大小。

代码语言:javascript
复制
 // These methods should be called whenever direct memory is allocated or
    // freed.  They allow the user to control the amount of direct memory
    // which a process may access.  All sizes are specified in bytes.
    static void reserveMemory(long size, int cap) {

        if (!memoryLimitSet && VM.isBooted()) {
            maxMemory = VM.maxDirectMemory();
            memoryLimitSet = true;
        }

        // optimist!内存够用!
        if (tryReserveMemory(size, cap)) {
            return;
        }
  
        final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();

        // retry while helping enqueue pending Reference objects
        // which includes executing pending Cleaner(s) which includes
        // Cleaner(s) that free direct buffer memory
        while (jlra.tryHandlePendingReference()) {
            if (tryReserveMemory(size, cap)) {
                return;
            }
        }

        // trigger VM's Reference processing
        System.gc();

        // a retry loop with exponential back-off delays
        // (this gives VM some time to do it's job)
        boolean interrupted = false;
        try {
            long sleepTime = 1;
            int sleeps = 0;
            while (true) {
                if (tryReserveMemory(size, cap)) {
                    return;
                }
                if (sleeps >= MAX_SLEEPS) {
                    break;
                }
                if (!jlra.tryHandlePendingReference()) {
                    try {
                        Thread.sleep(sleepTime);
                        sleepTime <<= 1;
                        sleeps++;
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }

            // no luck
            throw new OutOfMemoryError("Direct buffer memory");

        } finally {
            if (interrupted) {
                // don't swallow interrupts
                Thread.currentThread().interrupt();
            }
        }
    }

如果堆内存不够分配的话,jlra.tryHandlePendingReference()将触发一次非阻塞的Reference#tryHandlePending(false),该方法会将已经被JVM垃圾回收的DirectBuffer对象的堆外内存释放。

如果还是无法释放足够的内存,将会触发System.gc(),该方法会触发一个full gc,如果JVM参数没有设置-XX:+DisableExplicitGC。但是调用System.gc()并不能够保证full gc马上就能被执行。所以在后面打代码中,会进行最多9次尝试,看是否有足够的可用堆外内存来分配堆外内存。并且每次尝试之前,都对延迟等待时间,已给JVM足够的时间去完成full gc操作。如果9次尝试后依旧没有足够的可用堆外内存来分配本次堆外内存,则抛出OutOfMemoryError("Direct buffer memory”)异常。

之所以用使用full gc的很重要的一个原因是:System.gc()会对新生代和老生代都进行内存回收,这样会比较彻底地回收DirectByteBuffer对象以及他们关联的堆外内存。

有两个问题:堆外内存是多大?

代码中maxMemory = VM.maxDirectMemory();

代码语言:javascript
复制
private static long directMemory = 64 * 1024 * 1024; //64MB
public static long maxDirectMemory() {
        return directMemory;
    }

实际上在 JVM启动时,会对System做初始化,实际上堆外内存的大小设置逻辑为:

如果通过-Dsun.nio.MaxDirectMemorySize指定了这个属性,只要它不等于-1,那效果和加了-XX:MaxDirectMemorySize一样的,如果两个参数都没指定,那么最大堆外内存的值来自于directMemory = Runtime.getRuntime().maxMemory()。

其中在我们使用CMS GC的情况下的实现如下,其实是新生代的最大值-一个survivor的大小+老生代的最大值,也就是我们设置的-Xmx的值里除去一个survivor的大小就是默认的堆外内存的大小。

堆外内存的回收机制是什么?

Cleaner是PhantomReference的子类,并通过自身的next和prev字段维护的一个双向链表PhantomReference的作用在于跟踪垃圾回收过程,并不会对对象的垃圾回收过程造成任何的影响。

DirectByteBuffer对象在创建的时候关联了一个Cleaner,(cleaner = Cleaner.create(this, new Deallocator(base, size, cap));)说到PhantomReference它其实主要是用来跟踪对象何时被回收的,它不能影响gc决策,但是gc过程中如果发现某个对象除了只有PhantomReference引用它之外,并没有其他的地方引用它了,那将会把这个引用放到java.lang.ref.Reference.pending队列里,在gc完毕的时候通知ReferenceHandler这个守护线程去执行一些后置处理,而DirectByteBuffer关联的cleaner是PhantomReference的一个子类,在最终的处理里会通过Unsafe的free接口来释放DirectByteBuffer对应的堆外内存块.

代码语言:javascript
复制
private static class ReferenceHandler extends Thread {

        ReferenceHandler(ThreadGroup g, String name) {
            super(g, name);
        }

        public void run() {
            for (;;) {

                Reference r;
                synchronized (lock) {
                    if (pending != null) {
                        r = pending;
                        Reference rn = r.next;
                        pending = (rn == r) ? null : rn;
                        r.next = r;
                    } else {
                        try {
                            lock.wait();
                        } catch (InterruptedException x) { }
                        continue;
                    }
                }

                // Fast path for cleaners
                if (r instanceof Cleaner) {
                    //
                    ((Cleaner)r).clean();
                    continue;
                }

                ReferenceQueue q = r.queue;
                if (q != ReferenceQueue.NULL) q.enqueue(r);
            }
        }
    }

//如果System.gc();被禁止,也会触发堆外内存的回收

Reference#tryHandlePending(false)

代码语言:javascript
复制
static boolean tryHandlePending(boolean var0) {
        Reference var1;
        Cleaner var2;
        try {
            Reference.Lock var3 = lock;
            synchronized(lock) {
                if (pending == null) {
                    if (var0) {
                        lock.wait();
                    }

                    return var0;
                }

                var1 = pending;
              //cleaner对象
                var2 = var1 instanceof Cleaner ? (Cleaner)var1 : null;
                pending = var1.discovered;
                var1.discovered = null;
            }
        } catch (OutOfMemoryError var6) {
            Thread.yield();
            return true;
        } catch (InterruptedException var7) {
            return true;
        }

        if (var2 != null) {
          //clean方法回收
            var2.clean();
            return true;
        } else {
            ReferenceQueue var8 = var1.queue;
            if (var8 != ReferenceQueue.NULL) {
                var8.enqueue(var1);
            }

            return true;
        }
    }
代码语言:javascript
复制
public void clean() {
//将当前Cleaner从Cleaner链表中移除,这样当clean()执行完后,Cleaner就是一个无引用指向的对象了,也就是可被GC回收的对象
        if (remove(this)) {
            try {
               //thunk 在directByteBuffer 是Deallocator 对象
                this.thunk.run();
            } catch (final Throwable var2) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        if (System.err != null) {
                            (new Error("Cleaner terminated abnormally", var2)).printStackTrace();
                        }

                        System.exit(1);
                        return null;
                    }
                });
            }

        }
    }
代码语言:javascript
复制
        private Deallocator(long address, long size, int capacity) {
            assert (address != 0);
            this.address = address;
            this.size = size;
            this.capacity = capacity;
        }

        public void run() {
            if (address == 0) {
                // Paranoia
                return;
            }
            //回收掉堆外内存
            unsafe.freeMemory(address);
            address = 0;
           //修改堆外内存的剩余容量大小
            Bits.unreserveMemory(size, capacity);
        }

所以如果一直触发不了cms gc或者full gc,老年代的DirectByteBuffer对象不能被回收,那么堆外内存就一直不能被回收,可能导致内存泄漏。

参考资料:

http://ifeve.com/buffers/

http://lovestblog.cn/blog/2015/05/12/direct-buffer/

https://www.jianshu.com/p/007052ee3773

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Buffer
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档