前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划23 | 与存储相关的内存池及内存管理器的具体实现

Spark Core源码精读计划23 | 与存储相关的内存池及内存管理器的具体实现

作者头像
大数据真好玩
发布2019-08-21 15:34:40
5180
发布2019-08-21 15:34:40
举报

目录

  • 前言
  • 内存池MemoryPool
  • 存储内存池StorageMemoryPool
    • 构造与属性成员
    • 申请内存
    • 释放内存
  • 内存管理器MemoryManager
    • 构造与属性成员
    • 内存管理方法
  • 总结

前言

我们用两篇文章的时间搞清楚了Spark存储中的“块”到底是怎么一回事,接下来我们就可以放心来看Spark Core存储子系统的细节了。前面已经提到过,Spark会同时利用内存和外存,尤其是积极地利用内存作为存储媒介。这点与传统分布式计算框架(如Hadoop MapReduce)的“内存仅用于计算,外存仅用于存储”的方式是非常不同的,同时也是Spark高效设计哲学的体现。接下来一段时间内,我们先研究Spark存储中的内存部分,再研究磁盘(外存)部分。

虽然BlockManager是Spark存储子系统的司令官,但它并不会直接管理块,而会将对内存和外存的管理分别组织起来。与内存存储相关的组件包括内存池MemoryPool、内存管理器MemoryManager、内存存储器MemoryStore。本文先来探索内存池和内存管理器的大体实现。

内存池MemoryPool

MemoryPool抽象类从逻辑上非常松散地定义了Spark内存池的一些基本约定,其完整源码如下。

代码#23.1 - o.a.s.memory.MemoryPool抽象类

private[memory] abstract class MemoryPool(lock: Object) {
  @GuardedBy("lock")
  private[this] var _poolSize: Long = 0

  final def poolSize: Long = lock.synchronized {
    _poolSize
  }

  final def memoryFree: Long = lock.synchronized {
    _poolSize - memoryUsed
  }

  final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
    require(delta >= 0)
    _poolSize += delta
  }

  final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
    require(delta >= 0)
    require(delta <= _poolSize)
    require(_poolSize - delta >= memoryUsed)
    _poolSize -= delta
  }

  def memoryUsed: Long
}

在构造MemoryPool时,需要传入一个锁对象lock用于线程同步,该lock实际上就是后面会讲到的内存管理器MemoryManager。MemoryPool中定义了以下方法。

  • poolSize: 获得内存池的大小,单位为字节。
  • memoryUsed: 获得内存池中已占用内存的大小。 该方法未提供具体实现,需要子类实现。
  • memoryFree: 获得内存池中空闲内存的大小,就是上述poolSize减去memoryUsed。
  • incrementPoolSize(): 扩展内存池delta个字节的大小。 该方法不能被覆写。
  • decrementPoolSize(): 压缩内存池delta个字节的大小。 注意已占用的内存不能被压缩掉,并且该方法也不能被覆写。

以上所有方法(以及其实现类的大部分方法)都由MemoryManager保证线程安全性,防止多线程同时操作内存池,造成分配混乱。

MemoryPool有两个实现类:StorageMemoryPool与ExecutionMemoryPool。顾名思义,StorageMemoryPool用于存储,比如RDD数据、广播变量数据的缓存与分发;ExecutionMemoryPool用于执行,这包含Spark的计算(连接、聚合、排序等等)和Shuffle过程。ExecutionMemoryPool严格上来讲不属于存储子系统的组成部分,因此本文先来看StorageMemoryPool。

存储内存池StorageMemoryPool

构造与属性成员

代码#23.2 - o.a.s.memory.StorageMemoryPool类的构造与属性成员

private[memory] class StorageMemoryPool(
    lock: Object,
    memoryMode: MemoryMode
  ) extends MemoryPool(lock) with Logging {

  private[this] val poolName: String = memoryMode match {
    case MemoryMode.ON_HEAP => "on-heap storage"
    case MemoryMode.OFF_HEAP => "off-heap storage"
  }

  @GuardedBy("lock")
  private[this] var _memoryUsed: Long = 0L
  override def memoryUsed: Long = lock.synchronized {
    _memoryUsed
  }

  private var _memoryStore: MemoryStore = _
  def memoryStore: MemoryStore = {
    if (_memoryStore == null) {
      throw new IllegalStateException("memory store not initialized yet")
    }
    _memoryStore
  }
}

StorageMemoryPool的构造方法参数除了锁对象之外,还有一个MemoryMode,它表示该内存池会使用哪部分的内存,其定义如下。

代码#23.3 - o.a.s.memory.MemoryMode枚举

@Private
public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

其中,ON_HEAP表示使用堆内内存,即每个Executor JVM使用的那部分内存;OFF_HEAP表示使用堆外内存,即Worker节点上的本机内存(native memory),需要通过Unsafe API(讲解的文章见这里)来分配。

Spark堆内内存和堆外内存的关系如下面的简图所示。

图#23.1 - Spark堆内内存与堆外内存的关系

根据MemoryMode的不同,使用堆内内存时池子的名称为on-heap storage,使用堆外内存时池子的名称为off-heap storage

StorageMemoryPool使用私有变量_memoryUsed来记录使用了多少内存,并覆写memoryUsed这个Getter方法来返回之。另外,它还必须有与其关联的MemoryStore实例。MemoryStore真正地负责块在内存中的存取,下一篇文章就会讲解到它。下面来看StorageMemoryPool提供的方法。

申请内存

申请内存的逻辑由acquireMemory()方法来实现。

代码#23.4 - o.a.s.memory.StorageMemoryPool.acquireMemory()方法

def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
    val numBytesToFree = math.max(0, numBytes - memoryFree)
    acquireMemory(blockId, numBytes, numBytesToFree)
  }

  def acquireMemory(
      blockId: BlockId,
      numBytesToAcquire: Long,
      numBytesToFree: Long): Boolean = lock.synchronized {
    assert(numBytesToAcquire >= 0)
    assert(numBytesToFree >= 0)
    assert(memoryUsed <= poolSize)
    if (numBytesToFree > 0) {
      memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
    }

    val enoughMemory = numBytesToAcquire <= memoryFree
    if (enoughMemory) {
      _memoryUsed += numBytesToAcquire
    }
    enoughMemory
  }

这两个方法的执行流程是:ID为blockId的块需要申请numBytesToAcquire字节的内存,首先检查当前的剩余内存量memoryFree是否能满足分配,如果不能,就调用MemoryStore.evictBlocksToFreeSpace()方法,释放出(numBytes - memoryFree)大小的内存。然后再次检查剩余内存量是否能满足分配,如果够,就将占用内存量增加。最终返回是否分配成功。

可见,acquireMemory()虽然名义上为申请内存,但实际上没有什么真正的内存分配操作,更多的是检查与记录而已。这是因为在Java/Scala环境中,对象内存的实际分配和释放一般都是由JVM来管理的,Spark只需要跟踪好就可以了。这也符合其父类MemoryPool类注释中的描述:“Manages bookkeeping for an adjustable-sized region of memory”,其中bookkeeping一词的含义即为“记账”,ExecutionMemoryPool也是同理。至于堆外内存的实际分配,是由基于Unsafe API的MemoryAllocator/MemoryConsumer组件来实现,这就是后话了。

释放内存

释放内存的逻辑则由releaseMemory()和releaseAllMemory()方法来实现。

代码#23.5 - o.a.s.memory.StorageMemoryPool.releaseMemory()/releaseAllMemory()方法

def releaseMemory(size: Long): Unit = lock.synchronized {
    if (size > _memoryUsed) {
      logWarning(s"Attempted to release $size bytes of storage " +
        s"memory when we only have ${_memoryUsed} bytes")
      _memoryUsed = 0
    } else {
      _memoryUsed -= size
    }
  }

  def releaseAllMemory(): Unit = lock.synchronized {
    _memoryUsed = 0
  }

这个实现非常简单,就是将memoryUsed减去要释放的量,或者直接设为0。

下面再来看一看内存管理器MemoryManager的部分细节,它直接管理着MemoryPool,是Spark作业运行时内存管理的统一入口。

内存管理器MemoryManager

MemoryManager与MemoryPool一样,也是一个抽象类。Spark环境中的每个JVM实例都会持有一个MemoryManager,先来看它的属性成员和构造方法。

构造与属性成员

代码#23.6 - o.a.s.memory.MemoryManager抽象类的属性成员和构造方法

private[spark] abstract class MemoryManager(
    conf: SparkConf,
    numCores: Int,
    onHeapStorageMemory: Long,
    onHeapExecutionMemory: Long) extends Logging {

  @GuardedBy("this")
  protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP)
  @GuardedBy("this")
  protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP)
  @GuardedBy("this")
  protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP)
  @GuardedBy("this")
  protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP)

  protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
  protected[this] val offHeapStorageMemory =
    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

  onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory)
  onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)

  offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
  offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) 
}

MemoryManager需要4个构造方法参数:

  • conf: 即SparkConf;
  • numCores: 分配的CPU核心数;
  • onHeapStorageMemory: 用于存储的堆内内存的大小(字节);
  • onHeapExecutionMemory: 用于执行的堆内内存的大小(字节)。

MemoryManager初始化了4个内存池,分别是堆内、堆外的存储内存池,以及堆内、堆外的执行内存池。另外,堆外内存的最大值可以由配置项spark.memory.offHeap.size来指定,默认为0。堆外存储内存占总堆外内存的比例则由配置项spark.memory.storageFraction指定,默认0.5,即50%,剩下的就是堆外执行内存。这个参数在后面还会出现。

接下来对4个内存池分别调用其incrementPoolSize()方法,设定合适的容量,初始化完毕。

内存管理方法

MemoryManager中给出了一批内存管理方法的定义,这其中有些是抽象方法,需要其子类去实现。这些方法的清单如下。

代码#23.7 - o.a.s.memory.MemoryManager定义的内存管理方法

def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

  def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

  private[memory]
  def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long

  private[memory]
  def releaseExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Unit = synchronized {
    memoryMode match {
      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
    }
  }

  private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
    onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
      offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
  }

  def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
    memoryMode match {
      case MemoryMode.ON_HEAP => onHeapStorageMemoryPool.releaseMemory(numBytes)
      case MemoryMode.OFF_HEAP => offHeapStorageMemoryPool.releaseMemory(numBytes)
    }
  }

  final def releaseAllStorageMemory(): Unit = synchronized {
    onHeapStorageMemoryPool.releaseAllMemory()
    offHeapStorageMemoryPool.releaseAllMemory()
  }

  final def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit = synchronized {
    releaseStorageMemory(numBytes, memoryMode)
  }

可见,acquireStorageMemory()、acquireUnrollMemory()和acquireExecutionMemory()三个用于申请内存的方法都需要子类去实现。什么是Unroll内存呢?RDD在被缓存之前,它所占用的内存空间是不连续的,而被缓存到存储内存之后,就以块的形式来存储,占用连续的内存空间了。Unroll就是这个将RDD固化在连续内存空间的过程,中文一般翻译为“展开”。Unroll过程使用的内存空间就是展开内存,它本质上是存储内存中比较特殊的一部分。

各个释放内存的方法则基本上代理了MemoryPool对应的释放方法,比较容易理解。

除此之外,MemoryManager类还提供了Tungsten机制下的一些内存管理相关的属性。现在铺开讲它还为时过早,看官目前只需知道Tungsten是DataBricks在3~4年前提出的Spark优化方案即可,前文提到的堆外内存管理即属于Tungsten机制的一部分。早在这个系列开始之前,我曾经写过一篇关于Tungsten Sort Shuffle的简单解析,可以参考这里。

总结

本文通过引入对内存池MemoryPool的介绍,搞清楚了用于存储的内存池StorageMemoryPool的基本逻辑,另外还对内存及MemoryPool的管理器——MemoryManager进行了简要的分析。由于Spark内存管理这一块知识的交叉性比较强,代码不能拆分得很开,所以难免会出现一些未接触过的概念,在之后的源码阅读过程中自然会逐渐了解。

在下一篇文章中,我们会重点解析MemoryManager的两种实现,即静态内存管理器StaticMemoryManager、统一内存管理器UnifiedMemoryManager,从而深刻理解Spark Core的内存管理模型。它也是Spark作业内存调优的基础。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 内存池MemoryPool
  • 存储内存池StorageMemoryPool
    • 构造与属性成员
      • 申请内存
        • 释放内存
        • 内存管理器MemoryManager
          • 构造与属性成员
            • 内存管理方法
            • 总结
            相关产品与服务
            对象存储
            对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档