前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 内存管理 BufferPool

kafka 内存管理 BufferPool

作者头像
平凡的学生族
发布2020-06-12 18:27:09
1.2K0
发布2020-06-12 18:27:09
举报
文章被收录于专栏:后端技术后端技术

先看注释

代码语言:javascript
复制
/**
 * A pool of ByteBuffers kept under a given memory limit. This class is fairly specific to the needs of the producer. In
 * particular it has the following properties:
 * <ol>
 * <li>There is a special "poolable size" and buffers of this size are kept in a free list and recycled
 * <li>It is fair. That is all memory is given to the longest waiting thread until it has sufficient memory. This
 * prevents starvation or deadlock when a thread asks for a large chunk of memory and needs to block until multiple
 * buffers are deallocated.
 * </ol>
 */

结合代码可知,BufferPool负责ByteBuffer的申请和释放。 BufferPool会维持一组大小为poolableSize的ByteBuffer,便于快速申请/归还这个大小的ByteBuffer。该机制是由free空闲链表维持的。 对于非poolableSize的ByteBuffer,其申请和释放都委托给JVM BufferPool的内存申请是"公平的",永远优先满足先申请的线程,再满足后申请的。这样能防止死锁和饥饿。该机制是由waiters条件队列保障的。

内存管理

BufferPool将内存视为三个部分:

  • 已被分配出去的ByteBuffer。它们的大小可能各异,可以是或不是poolableSize
  • 维持在free空闲链表的ByteBuffer(free会维持它们的引用)。每一个的大小都是poolableSize。
    • 申请这个大小的ByteBuffer时,从free中取出即可
    • 归还这个大小的ByteBuffer时,放回free
  • 未分配空闲内存[1]。这块内存在JVM中,是空闲的。用一个数字nonPooledAvailableMemory代表。它的回收是交给gc的。
    • 申请不是poolableSize大小的ByteBuffer时,调用ByteBuffer.allocate(size)
    • 归还不是poolableSize大小的ByteBuffer时,调用者解除对该ByteBuffer的引用,然后nonPooledAvailableMemory增加这个大小即可,其回收交给gc

waiters条件队列

维持了一个Condition队列,每个线程在申请内存不足时,会阻塞于生成的一个Condition并进入此队。

代码语言:javascript
复制
private final Deque<Condition> waiters;

比如此处,在allocate方法中,没有足够内存:

所以,队列中每一个Condition代表一个因内存不足而阻塞的线程,当有ByteBuffer释放时,取出队首的Condition,调用signal将对应线程唤醒即可。

allocate

根据要分配的内存大小有不同的行为。

如果要分配的内存size等于poolableSize,从free取出一块即可。如果free没有就等待。

如果size不等于poolableSize,需要从非池化内存分配:

  • 如果有足够的空闲内存,也就是空闲非池化内存+空闲链表总和 >= size[2],那就能一次性分配。
    1. 先逐个释放free中的ByteBuffer,直到有足够的nonPooledAvailableMemory为止
    2. 直接分配size大小的内存
  • 如果没有足够空闲内存,就需要边等待,边分配一部分。
    1. 执行以下循环,直到accumulated>=size:
      1. 等待一定时间(在等待期间,可能有新的内存块插入free,也可能有新的非池化内存,使nonPooledAvailableMemory增加),若超时就抛出内存不足异常,否则返回时未超时,说明有新的空闲内存了。
      2. 逐个释放free中的ByteBuffer,直到有足够nonPooledAvailableMemory为止。期间会扣除nonPooledAvailableMemory的份额,加到accumulated上。但我们并不申请内存,只是把这块份额预留出来。
    2. 循环退出时,要么预留出了足够的内存,申请即可;要么是等待超时,则归还预留的份额(这里和代码的理解不同)

对归还的份额有疑问:

归还size?

deallocate

根据要释放的内存大小有不同的行为。

  • 如果大小为poolableSize,直接挂进size即可
  • 如果不是,把大小增加到nonPooledAvailableMemory。调用者解除该ByteBuffer的引用,然后交给gc回收即可。

对代码关于非池化内存的回收行为有疑问:


  1. 代表所有剩下的空闲内存,totalMemory中,没有维持在free内的整块内存,都是未分配空闲内存。
  2. nonPooledAvailableMemory + freeListSize >= size
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内存管理
  • waiters条件队列
  • allocate
  • deallocate
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档