专栏首页暴走大数据Spark Core源码精读计划23 | 与存储相关的内存池及内存管理器的具体实现

Spark Core源码精读计划23 | 与存储相关的内存池及内存管理器的具体实现

目录

  • 前言
  • 内存池MemoryPool
  • 存储内存池StorageMemoryPool
    • 构造与属性成员
    • 申请内存
    • 释放内存
  • 内存管理器MemoryManager
    • 构造与属性成员
    • 内存管理方法
  • 总结

前言

我们用两篇文章的时间搞清楚了Spark存储中的“块”到底是怎么一回事,接下来我们就可以放心来看Spark Core存储子系统的细节了。前面已经提到过,Spark会同时利用内存和外存,尤其是积极地利用内存作为存储媒介。这点与传统分布式计算框架(如Hadoop MapReduce)的“内存仅用于计算,外存仅用于存储”的方式是非常不同的,同时也是Spark高效设计哲学的体现。接下来一段时间内,我们先研究Spark存储中的内存部分,再研究磁盘(外存)部分。

虽然BlockManager是Spark存储子系统的司令官,但它并不会直接管理块,而会将对内存和外存的管理分别组织起来。与内存存储相关的组件包括内存池MemoryPool、内存管理器MemoryManager、内存存储器MemoryStore。本文先来探索内存池和内存管理器的大体实现。

内存池MemoryPool

MemoryPool抽象类从逻辑上非常松散地定义了Spark内存池的一些基本约定,其完整源码如下。

代码#23.1 - o.a.s.memory.MemoryPool抽象类

private[memory] abstract class MemoryPool(lock: Object) {
  @GuardedBy("lock")
  private[this] var _poolSize: Long = 0

  final def poolSize: Long = lock.synchronized {
    _poolSize
  }

  final def memoryFree: Long = lock.synchronized {
    _poolSize - memoryUsed
  }

  final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
    require(delta >= 0)
    _poolSize += delta
  }

  final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
    require(delta >= 0)
    require(delta <= _poolSize)
    require(_poolSize - delta >= memoryUsed)
    _poolSize -= delta
  }

  def memoryUsed: Long
}

在构造MemoryPool时,需要传入一个锁对象lock用于线程同步,该lock实际上就是后面会讲到的内存管理器MemoryManager。MemoryPool中定义了以下方法。

  • poolSize: 获得内存池的大小,单位为字节。
  • memoryUsed: 获得内存池中已占用内存的大小。 该方法未提供具体实现,需要子类实现。
  • memoryFree: 获得内存池中空闲内存的大小,就是上述poolSize减去memoryUsed。
  • incrementPoolSize(): 扩展内存池delta个字节的大小。 该方法不能被覆写。
  • decrementPoolSize(): 压缩内存池delta个字节的大小。 注意已占用的内存不能被压缩掉,并且该方法也不能被覆写。

以上所有方法(以及其实现类的大部分方法)都由MemoryManager保证线程安全性,防止多线程同时操作内存池,造成分配混乱。

MemoryPool有两个实现类:StorageMemoryPool与ExecutionMemoryPool。顾名思义,StorageMemoryPool用于存储,比如RDD数据、广播变量数据的缓存与分发;ExecutionMemoryPool用于执行,这包含Spark的计算(连接、聚合、排序等等)和Shuffle过程。ExecutionMemoryPool严格上来讲不属于存储子系统的组成部分,因此本文先来看StorageMemoryPool。

存储内存池StorageMemoryPool

构造与属性成员

代码#23.2 - o.a.s.memory.StorageMemoryPool类的构造与属性成员

private[memory] class StorageMemoryPool(
    lock: Object,
    memoryMode: MemoryMode
  ) extends MemoryPool(lock) with Logging {

  private[this] val poolName: String = memoryMode match {
    case MemoryMode.ON_HEAP => "on-heap storage"
    case MemoryMode.OFF_HEAP => "off-heap storage"
  }

  @GuardedBy("lock")
  private[this] var _memoryUsed: Long = 0L
  override def memoryUsed: Long = lock.synchronized {
    _memoryUsed
  }

  private var _memoryStore: MemoryStore = _
  def memoryStore: MemoryStore = {
    if (_memoryStore == null) {
      throw new IllegalStateException("memory store not initialized yet")
    }
    _memoryStore
  }
}

StorageMemoryPool的构造方法参数除了锁对象之外,还有一个MemoryMode,它表示该内存池会使用哪部分的内存,其定义如下。

代码#23.3 - o.a.s.memory.MemoryMode枚举

@Private
public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

其中,ON_HEAP表示使用堆内内存,即每个Executor JVM使用的那部分内存;OFF_HEAP表示使用堆外内存,即Worker节点上的本机内存(native memory),需要通过Unsafe API(讲解的文章见这里)来分配。

Spark堆内内存和堆外内存的关系如下面的简图所示。

图#23.1 - Spark堆内内存与堆外内存的关系

根据MemoryMode的不同,使用堆内内存时池子的名称为on-heap storage,使用堆外内存时池子的名称为off-heap storage

StorageMemoryPool使用私有变量_memoryUsed来记录使用了多少内存,并覆写memoryUsed这个Getter方法来返回之。另外,它还必须有与其关联的MemoryStore实例。MemoryStore真正地负责块在内存中的存取,下一篇文章就会讲解到它。下面来看StorageMemoryPool提供的方法。

申请内存

申请内存的逻辑由acquireMemory()方法来实现。

代码#23.4 - o.a.s.memory.StorageMemoryPool.acquireMemory()方法

def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
    val numBytesToFree = math.max(0, numBytes - memoryFree)
    acquireMemory(blockId, numBytes, numBytesToFree)
  }

  def acquireMemory(
      blockId: BlockId,
      numBytesToAcquire: Long,
      numBytesToFree: Long): Boolean = lock.synchronized {
    assert(numBytesToAcquire >= 0)
    assert(numBytesToFree >= 0)
    assert(memoryUsed <= poolSize)
    if (numBytesToFree > 0) {
      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    }

    val enoughMemory = numBytesToAcquire <= memoryFree
    if (enoughMemory) {
      _memoryUsed += numBytesToAcquire
    }
    enoughMemory
  }

这两个方法的执行流程是:ID为blockId的块需要申请numBytesToAcquire字节的内存,首先检查当前的剩余内存量memoryFree是否能满足分配,如果不能,就调用MemoryStore.evictBlocksToFreeSpace()方法,释放出(numBytes - memoryFree)大小的内存。然后再次检查剩余内存量是否能满足分配,如果够,就将占用内存量增加。最终返回是否分配成功。

可见,acquireMemory()虽然名义上为申请内存,但实际上没有什么真正的内存分配操作,更多的是检查与记录而已。这是因为在Java/Scala环境中,对象内存的实际分配和释放一般都是由JVM来管理的,Spark只需要跟踪好就可以了。这也符合其父类MemoryPool类注释中的描述:“Manages bookkeeping for an adjustable-sized region of memory”,其中bookkeeping一词的含义即为“记账”,ExecutionMemoryPool也是同理。至于堆外内存的实际分配,是由基于Unsafe API的MemoryAllocator/MemoryConsumer组件来实现,这就是后话了。

释放内存

释放内存的逻辑则由releaseMemory()和releaseAllMemory()方法来实现。

代码#23.5 - o.a.s.memory.StorageMemoryPool.releaseMemory()/releaseAllMemory()方法

def releaseMemory(size: Long): Unit = lock.synchronized {
    if (size > _memoryUsed) {
      logWarning(s"Attempted to release $size bytes of storage " +
        s"memory when we only have ${_memoryUsed} bytes")
      _memoryUsed = 0
    } else {
      _memoryUsed -= size
    }
  }

  def releaseAllMemory(): Unit = lock.synchronized {
    _memoryUsed = 0
  }

这个实现非常简单,就是将memoryUsed减去要释放的量,或者直接设为0。

下面再来看一看内存管理器MemoryManager的部分细节,它直接管理着MemoryPool,是Spark作业运行时内存管理的统一入口。

内存管理器MemoryManager

MemoryManager与MemoryPool一样,也是一个抽象类。Spark环境中的每个JVM实例都会持有一个MemoryManager,先来看它的属性成员和构造方法。

构造与属性成员

代码#23.6 - o.a.s.memory.MemoryManager抽象类的属性成员和构造方法

private[spark] abstract class MemoryManager(
    conf: SparkConf,
    numCores: Int,
    onHeapStorageMemory: Long,
    onHeapExecutionMemory: Long) extends Logging {

  @GuardedBy("this")
  protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
  @GuardedBy("this")
  protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  @GuardedBy("this")
  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
  @GuardedBy("this")
  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

  protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
  protected[this] val offHeapStorageMemory =
    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

  onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
  onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) 
}

MemoryManager需要4个构造方法参数:

  • conf: 即SparkConf;
  • numCores: 分配的CPU核心数;
  • onHeapStorageMemory: 用于存储的堆内内存的大小(字节);
  • onHeapExecutionMemory: 用于执行的堆内内存的大小(字节)。

MemoryManager初始化了4个内存池,分别是堆内、堆外的存储内存池,以及堆内、堆外的执行内存池。另外,堆外内存的最大值可以由配置项spark.memory.offHeap.size来指定,默认为0。堆外存储内存占总堆外内存的比例则由配置项spark.memory.storageFraction指定,默认0.5,即50%,剩下的就是堆外执行内存。这个参数在后面还会出现。

接下来对4个内存池分别调用其incrementPoolSize()方法,设定合适的容量,初始化完毕。

内存管理方法

MemoryManager中给出了一批内存管理方法的定义,这其中有些是抽象方法,需要其子类去实现。这些方法的清单如下。

代码#23.7 - o.a.s.memory.MemoryManager定义的内存管理方法

def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

  def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

  private[memory]
  def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long

  private[memory]
  def releaseExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Unit = synchronized {
    memoryMode match {
      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
    }
  }

  private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
    onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
      offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
  }

  def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
    memoryMode match {
      case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
      case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
    }
  }

  final def releaseAllStorageMemory(): Unit = synchronized {
    onHeapStorageMemoryPool.releaseAllMemory()
    offHeapStorageMemoryPool.releaseAllMemory()
  }

  final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
    releaseStorageMemory(numBytes, memoryMode)
  }

可见,acquireStorageMemory()、acquireUnrollMemory()和acquireExecutionMemory()三个用于申请内存的方法都需要子类去实现。什么是Unroll内存呢?RDD在被缓存之前,它所占用的内存空间是不连续的,而被缓存到存储内存之后,就以块的形式来存储,占用连续的内存空间了。Unroll就是这个将RDD固化在连续内存空间的过程,中文一般翻译为“展开”。Unroll过程使用的内存空间就是展开内存,它本质上是存储内存中比较特殊的一部分。

各个释放内存的方法则基本上代理了MemoryPool对应的释放方法,比较容易理解。

除此之外,MemoryManager类还提供了Tungsten机制下的一些内存管理相关的属性。现在铺开讲它还为时过早,看官目前只需知道Tungsten是DataBricks在3~4年前提出的Spark优化方案即可,前文提到的堆外内存管理即属于Tungsten机制的一部分。早在这个系列开始之前,我曾经写过一篇关于Tungsten Sort Shuffle的简单解析,可以参考这里。

总结

本文通过引入对内存池MemoryPool的介绍,搞清楚了用于存储的内存池StorageMemoryPool的基本逻辑,另外还对内存及MemoryPool的管理器——MemoryManager进行了简要的分析。由于Spark内存管理这一块知识的交叉性比较强,代码不能拆分得很开,所以难免会出现一些未接触过的概念,在之后的源码阅读过程中自然会逐渐了解。

在下一篇文章中,我们会重点解析MemoryManager的两种实现,即静态内存管理器StaticMemoryManager、统一内存管理器UnifiedMemoryManager,从而深刻理解Spark Core的内存管理模型。它也是Spark作业内存调优的基础。

本文分享自微信公众号 - 暴走大数据(zhouqiantanxi),作者:LittleMagic

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Core源码精读计划24 | StaticMemoryManager——静态内存管理机制

    在上一篇文章的最后,我们阅读了内存管理器MemoryManager抽象类的源码,并且提到它有两种实现:静态内存管理器StaticMemoryManager、统一...

    暴走大数据
  • Spark Core源码精读计划25 | UnifiedMemoryManager——统一内存管理机制

    在前文的末尾,我们分析了静态内存管理器StaticMemoryManager的优缺点,并指出统一内存管理器UnifiedMemoryManager能够弥补它的缺...

    暴走大数据
  • 关于Redis的几件小事 | Redis的数据类型/过期策略/内存淘汰

    这个是类似map的一种结构,这个一般就是可以将结构化的数据,比如一个对象(前提是这个对象没嵌套其他的对象)给缓存在redis里,然后每次读写缓存的时候,可以就操...

    暴走大数据
  • SQL server 2014 内存表特性概述

    Leshami
  • 疑案追踪:Spring Boot内存泄露排查记

    在项目迁移到Spring Boot之后,发生内存使用量过高的问题。本文介绍了整个排查过程以及使用到的工具,也非常适用于其他堆外内存排查。

    美团技术团队
  • Linux 内存管理初探

    linux 内存是后台开发人员,需要深入了解的计算机资源。合理的使用内存,有助于提升机器的性能和稳定性。本文主要介绍 linux 内存组织结构和页面布局,内存碎...

    用户6543014
  • 内存溢出和内存泄露

    内存溢出 out of memory,是指程序在申请内存时,没有足够的内存空间供其使用,出现out of memory;比如申请了一个integer,但给它存了...

    Demo_Yang
  • Zephyr 内存分配

    int k_mem_pool_alloc(struct k_mem_pool *p, struct k_mem_block *block, size_t si...

    无限之生
  • 一图解千愁,jvm内存从来没有这么简单过!

    看到这张图的同学,千万不要到处分享。我们仅限于小范围讨论,因为这张图威力很大,是我花了10年时间才画出来的!

    xjjdog
  • 故障分析 | MySQL OOM 故障应如何下手

    前阵子处理这样一个案例,某客户的实例 mysqld 进程内存经常持续增加导致最终被 OOM killer。作为 DBA 肯定想知道有哪些原因可能会导致 OOM(...

    爱可生开源社区

扫码关注云+社区

领取腾讯云代金券