专栏首页暴走大数据Spark Core源码精读计划24 | StaticMemoryManager——静态内存管理机制

Spark Core源码精读计划24 | StaticMemoryManager——静态内存管理机制

目录

  • 前言
  • MemoryManager的初始化
  • 静态内存管理器StaticMemoryManager
    • 构造方法
    • 计算堆内存储/执行内存总量
    • 内存申请方法
    • 静态内存管理布局图解
  • 总结

前言

在上一篇文章的最后,我们阅读了内存管理器MemoryManager抽象类的源码,并且提到它有两种实现:静态内存管理器StaticMemoryManager、统一内存管理器UnifiedMemoryManager。其中,StaticMemoryManager是随着Spark诞生就存在的,UnifiedMemoryManager则是从Spark 1.6版本开始服役,并且后者是目前Spark Core中的默认内存管理器,前者已经标记为过时。虽然StaticMemoryManager已经不怎么用了,但它的逻辑相对简单,适合用来开胃,本文先来研究它。看官也可以先复习一下上篇文章关于MemoryManager的部分。

前面也已说过,了解内存管理机制是Spark内存调优的基础,因此本篇及下一篇内容都非常重要,同样也可以当做面试知识点手册来读。

MemoryManager的初始化

在之前讲SparkEnv时,略去了MemoryManager相关的初始化代码,它位于SparkEnv.create()方法中,如下。

代码#24.1 - SparkEnv.create()方法中初始化MemoryManager

    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

可见,如果SparkConf中spark.memory.useLegacyMode配置项为true,则使用较老的StaticMemoryManager,反之默认使用较新的UnifiedMemoryManager。

静态内存管理器StaticMemoryManager

构造方法

代码#24.2 - o.a.s.memory.StaticMemoryManager类的构造方法与属性成员

private[spark] class StaticMemoryManager(
    conf: SparkConf,
    maxOnHeapExecutionMemory: Long,
    override val maxOnHeapStorageMemory: Long,
    numCores: Int)
  extends MemoryManager(
    conf,
    numCores,
    maxOnHeapStorageMemory,
    maxOnHeapExecutionMemory) {

  def this(conf: SparkConf, numCores: Int) {
    this(
      conf,
      StaticMemoryManager.getMaxExecutionMemory(conf),
      StaticMemoryManager.getMaxStorageMemory(conf),
      numCores)
  }

  offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
  offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)

  private val maxUnrollMemory: Long = {
    (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
  } 
}

SparkEnv初始化MemoryManager时,都不是直接调用主构造方法,而是调用辅助构造方法。也就是说,堆内存储内存总量maxOnHeapStorageMemory通过调用StaticMemoryManager伴生对象中的getMaxStorageMemory()方法来计算,堆内执行内存总量maxOnHeapExecutionMemory则通过调用getMaxExecutionMemory()方法来计算。

在构造方法中还有两句代码,它负责把原本属于堆外存储池的空间转移到堆外执行池。也就是说StaticMemoryManager只有堆外执行内存,没有堆外存储内存。至于maxUnrollMemory,稍后再说。

计算堆内存储/执行内存总量

这两个方法位于伴生对象中,代码如下。

代码#24.3 - o.a.s.memory.StaticMemoryManager.getMaxStorageMemory()/getMaxExecutionMemory()方法

  
private def getMaxStorageMemory(conf: SparkConf): Long = {
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
  }

  private def getMaxExecutionMemory(conf: SparkConf): Long = {
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

    if (systemMaxMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < MIN_MEMORY_BYTES) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
  }

getMaxStorageMemory()方法的执行流程为:

  • 通过Runtime.maxMemory()这个native方法取得当前JVM可用的最大内存(堆内存)。spark.testing.memory参数是测试参数,几乎不用。
  • 根据spark.storage.memoryFraction配置项,取得存储内存占堆内内存的比例,默认0.6(60%)。
  • 根据spark.storage.safetyFraction配置项,取得存储内存的安全比例,默认0.9(90%)。
  • 将1~3步中的三个量相乘并取整,即得到堆内存储内存的总量。

前面也已经提过,存储内存中有一块特殊用途的区域,叫展开内存。它占存储内存的比例由spark.storage.unrollFraction配置项指定,默认值0.2(20%)。

getMaxExecutionMemory()方法的执行流程类似,不过它会预先校验Driver和Executor的内存量,确保有32MB以上。另外,执行内存占堆内内存的比例由配置项spark.shuffle.memoryFraction指定,默认0.2(20%);执行内存的安全比例则由spark.shuffle.safetyFraction指定,默认0.8(80%)。

除了上边的两块内存之外,堆内内存还会剩余(1 - spark.storage.memoryFraction - spark.shuffle.memoryFraction)比例的空间,默认占20%,它就用来存储用户代码中自定义的数据结构,以及Spark内部的一些元数据。

看官可能会问,为什么除了实际占比之外,还会有一个安全比例呢?我们已经知道,Spark中的对象可以序列化存储,也可以非序列化存储。对于序列化对象,可以通过其字节流的长度获知其大小。而对于非序列化对象,其占用的内存就只能通过估算得到,与实际情况可能出入较大。另外,MemoryManager申请的内存空间可能还未实际分配,而标记为释放的内存空间也可能并未被JVM实际GC掉,存在滞后性。总之,Spark并不能准确地跟踪堆内内存的占用量,为了防止偏差过大出现OOM,就必须预留一些缓冲空间了。默认会预留10%的存储内存、20%的执行内存作为缓冲。

内存申请方法

StaticMemoryManager覆写了MemoryManager中定义的申请内存的3个方法,其源码如下。

代码#24.4 - o.a.s.memory.StaticMemoryManager中的内存申请方法

 
 override def acquireStorageMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    require(memoryMode != MemoryMode.OFF_HEAP,
      "StaticMemoryManager does not support off-heap storage memory")
    if (numBytes > maxOnHeapStorageMemory) {
      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
        s"memory limit ($maxOnHeapStorageMemory bytes)")
      false
    } else {
      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
    }
  }

  override def acquireUnrollMemory(
      blockId: BlockId,
      numBytes: Long,
      memoryMode: MemoryMode): Boolean = synchronized {
    require(memoryMode != MemoryMode.OFF_HEAP,
      "StaticMemoryManager does not support off-heap unroll memory")
    val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
    val freeMemory = onHeapStorageMemoryPool.memoryFree
    val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
    val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
    onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
  }

  private[memory]
  override def acquireExecutionMemory(
      numBytes: Long,
      taskAttemptId: Long,
      memoryMode: MemoryMode): Long = synchronized {
    memoryMode match {
      case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
      case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    }
  }

可见,它们基本上是代理了各个MemoryPool的acquireMemory()方法,并且存储内存只在堆内申请,执行内存可以根据MemoryMode的不同在堆内或堆外申请。

对于展开内存还有一些特殊处理:由于将RDD展开为块需要占用连续的存储空间,在必要的情况下需要释放其他缓存的空间,以放下这个块。释放空间的上限为“最大展开内存 - 现占用的展开内存 - 空闲存储内存”,之所以要规定这个上限,是为了防止展开一个超大的块导致所有缓存都阵亡(blow away the entire cache)。

静态内存管理布局图解

只用文字描述过于抽象,所以用下图来形象地说明Spark的静态内存管理布局。

图#24.1 - Spark的静态内存管理布局

总结

经过上面的分析,我们可以看出,StaticMemoryManager之所以名为“静态”,是因为它的内存区域都是由各个比例参数(fraction)规定好的。这样实现起来简单,但是在复杂业务场景或者参数设定不当时,它容易造成“冰火两重天”的情况,即一方内存过剩而另一方内存紧张。在Spark 1.6版本提出的UnifiedMemoryManager致力于解决这个问题,并且是现在的事实标准,当然它也更加复杂。下一篇文章再来仔细地探索它,晚安。

本文分享自微信公众号 - 暴走大数据(zhouqiantanxi),作者:LittleMagic

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark Core源码精读计划23 | 与存储相关的内存池及内存管理器的具体实现

    我们用两篇文章的时间搞清楚了Spark存储中的“块”到底是怎么一回事,接下来我们就可以放心来看Spark Core存储子系统的细节了。前面已经提到过,Spark...

    暴走大数据
  • Spark Core源码精读计划25 | UnifiedMemoryManager——统一内存管理机制

    在前文的末尾,我们分析了静态内存管理器StaticMemoryManager的优缺点,并指出统一内存管理器UnifiedMemoryManager能够弥补它的缺...

    暴走大数据
  • 关于Redis的几件小事 | Redis的数据类型/过期策略/内存淘汰

    这个是类似map的一种结构,这个一般就是可以将结构化的数据,比如一个对象(前提是这个对象没嵌套其他的对象)给缓存在redis里,然后每次读写缓存的时候,可以就操...

    暴走大数据
  • 【转】Linux内存管理(最透彻的一篇)

    摘要:本章首先以应用程序开发者的角度审视Linux的进程内存管理,在此基础上逐步深入到内核中讨论系统物理内存管理和内核内存的使用方法。力求从外到内、水到渠成地引...

    用户3033338
  • Linux内存管理(最透彻的一篇)【转】

    转自:https://www.cnblogs.com/ralap7/p/9184773.html

    用户3033338
  • Android 内存暴减的秘密?!

    在 我这样减少了26.5M Java内存!中内存优化一期已经告一段落,主要做的事情是,造了几个分析内存问题的轮子,定位进程各种类型内存占用情况,分析了线程创建O...

    WeTest质量开放平台团队
  • iOS开发中内存泄漏检测工具--MLeaksFinder

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010105969/article/details/...

    用户1451823
  • 调整JVM内存大小

    JAVA程序启动时JVM都会分配一个初始内存和最大内存给这个应用程序。这个初始内存和最大内存在一定程度都会影响程序的性能;Tomcat默认可以使用的内存为128...

    MonroeCode
  • Android 性能测试之内存性能及内存泄漏篇

    APP 占用内存的测试,要比 CPU 的更为简单。App memory 数据来源是 dumpsysmeminfo 。Android 程序内存主要是两部分:nat...

    陈帅
  • Spark面对OOM问题的解决方法及优化总结

    map执行中内存溢出代表了所有map类型的操作,包括:flatMap,filter,mapPatitions等。shuffle后内存溢出的shuffle操作包括...

    王知无

扫码关注云+社区

领取腾讯云代金券