前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划26 | 内存存储MemoryStore的具体实现

Spark Core源码精读计划26 | 内存存储MemoryStore的具体实现

作者头像
大数据真好玩
发布2019-08-21 15:32:52
7090
发布2019-08-21 15:32:52
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

  • 前言
  • MemoryEntry
  • MemoryStore
    • 构造与属性成员
    • 直接写入字节
    • 写入迭代器化的数据
    • 读取字节与迭代器化的数据
    • 淘汰缓存块
  • 总结

前言

前面我们已经对内存池MemoryPool、内存管理器MemoryManager有了比较深入的了解,接下来要介绍的就是MemoryStore,它负责Spark内存存储的具体事项,将内存管理机制与存储块联系起来。本文先介绍与MemoryStore相关的MemoryEntry,然后详细分析MemoryStore的主要源码。

MemoryEntry

顾名思义,MemoryEntry就是内存中的一个“项”,或者说是块在内存中的抽象表示。它由一个特征定义。

代码#26.1 - o.a.s.memory.MemoryEntry特征

代码语言:javascript
复制
代码语言:javascript
复制
private sealed trait MemoryEntry[T] {
  def size: Long
  def memoryMode: MemoryMode
  def classTag: ClassTag[T]
}

其中,size表示该MemoryEntry代表的块大小,memoryMode表示块存储在堆内内存还是堆外内存,classTag则是该块所存储的对象的类型标记。MemoryEntry有序列化和反序列化的两种实现,如下所示。

代码#26.2 - o.a.s.memory.SerializedMemoryEntry/DeserializedMemoryEntry类

代码语言:javascript
复制
代码语言:javascript
复制
private case class DeserializedMemoryEntry[T](
    value: Array[T],
    size: Long,
    classTag: ClassTag[T]) extends MemoryEntry[T] {
  val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}

private case class SerializedMemoryEntry[T](
    buffer: ChunkedByteBuffer,
    memoryMode: MemoryMode,
    classTag: ClassTag[T]) extends MemoryEntry[T] {
  def size: Long = buffer.size
}

可见,反序列化的DeserializedMemoryEntry只能用堆内内存存储,其数据是T类型的对象的数组。序列化的SerializedMemoryEntry能用堆内和堆外内存存储,数据用之前讲过的字节缓存ChunkedByteBuffer包装,并且其长度就是该SerializedMemoryEntry的大小。

MemoryStore

MemoryStore的内容比较多,仍然分块来看。

构造与属性成员

代码#26.3 - o.a.s.memory.MemoryStore类的构造与属性成员

代码语言:javascript
复制
代码语言:javascript
复制
private[spark] class MemoryStore(
    conf: SparkConf,
    blockInfoManager: BlockInfoManager,
    serializerManager: SerializerManager,
    memoryManager: MemoryManager,
    blockEvictionHandler: BlockEvictionHandler)
  extends Logging {

  private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

  private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
  private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()

  private val unrollMemoryThreshold: Long =
    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)

  private def maxMemory: Long = {
    memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory
  }

  if (maxMemory < unrollMemoryThreshold) {
    logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
      s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
      s"memory. Please configure Spark with more memory.")
  }

  logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))

  private def memoryUsed: Long = memoryManager.storageMemoryUsed

  def currentUnrollMemory: Long = memoryManager.synchronized {
    onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
  }

  private def blocksMemoryUsed: Long = memoryManager.synchronized {
    memoryUsed - currentUnrollMemory
  }

可见,MemoryStore需要5个构造方法参数。前4个参数我们已经很熟悉了,不再多说。第5个参数是BlockEvictionHandler类型的,它实际上也是个特征,实现了该特征的类的作用就是将块从内存中淘汰掉。目前只有BlockManager实现了该特征,所以等到讲BlockManager时,再回头看它。

以下是MemoryStore类的属性成员:

  • entries:块ID与对应的MemoryEntry的映射关系,用LinkedHashMap结构存储,初始容量为32,负载因子0.75。
  • onHeapUnrollMemoryMap/offHeapUnrollMemoryMap:分别存储TaskAttempId与该Task在堆内、堆外内存占用的展开内存大小映射关系。
  • unrollMemoryThreshold:在展开块之前申请的初始展开内存大小,由spark.storage.unrollMemoryThreshold配置项来控制,默认1MB。

除此之外,还有四个Getter方法,它们负责返回对应内存的量:

  • maxMemory:堆内与堆外存储内存之和。如果内存管理器为StaticMemoryManager,该值为定值;如果内存管理器为UnifiedMemoryManager,该值会浮动。
  • memoryUsed:已经使用了的堆内与堆外存储内存之和。
  • currentUnrollMemory:当前展开内存占用的大小,由上面的onHeapUnrollMemoryMap/offHeapUnrollMemoryMap统计而来。
  • blocksMemoryUsed:当前除展开内存之外的存储内存(即真正存储块的内存)大小,即memoryUsed与currentUnrollMemory之差。

下面我们分别来看向MemoryStore写入以及从MemoryStore读取数据的方法。

直接写入字节

该方法名为putBytes(),代码如下。

代码#26.4 - o.a.s.memory.MemoryStore.putBytes()方法

代码语言:javascript
复制
代码语言:javascript
复制
  def putBytes[T: ClassTag](
      blockId: BlockId,
      size: Long,
      memoryMode: MemoryMode,
      _bytes: () => ChunkedByteBuffer): Boolean = {
    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
      
      val bytes = _bytes()
      assert(bytes.size == size)
      val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
      entries.synchronized {
        entries.put(blockId, entry)
      }
      logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
        blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
      true
    } else {
      false
    }
  }

该方法的实现比较简单:首先调用MemoryManager.acquireStorageMemory()方法申请所需的内存,然后调用参数中传入的偏函数_bytes,获取已经转化为ChunkedByteBuffer的数据。再创建出对应的SerializedMemoryEntry,并将该MemoryEntry放入entries映射。注意LinkedHashMap本身不是线程安全的,因此对其并发访问都要加锁。

写入迭代器化的数据

所谓迭代器化的数据,就是指用Iterator[T]形式表示的块数据。之所以会这样表示,是因为有时单个块对应的数据可能过大,不能一次性存入内存。为了避免造成OOM,就可以一边遍历迭代器,一边周期性地写内存,并检查内存是否够用,就像翻书一样。“展开”(Unroll)这个词形象地说明了该过程,其对应的方法是putIteratorAsValues()与putIteratorAsBytes(),分别产生DeserializedMemoryEntry与SerializedMemoryEntry。由于两个方法的逻辑类似,因此我们只以putIteratorAsValues()来讲解。

代码很长,但我还是不“Unroll”了,全部放在下面。

代码#26.5 - o.a.s.memory.MemoryStore.putIteratorAsValues()

代码语言:javascript
复制
代码语言:javascript
复制
  private[storage] def putIteratorAsValues[T](
      blockId: BlockId,
      values: Iterator[T],
      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    var elementsUnrolled = 0
    var keepUnrolling = true
    val initialMemoryThreshold = unrollMemoryThreshold
    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
    var memoryThreshold = initialMemoryThreshold
    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
    var unrollMemoryUsedByThisBlock = 0L
    var vector = new SizeTrackingVector[T]()(classTag)

    keepUnrolling =
      reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)

    if (!keepUnrolling) {
      logWarning(s"Failed to reserve initial memory threshold of " +
        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    }

    while (values.hasNext && keepUnrolling) {
      vector += values.next()
      if (elementsUnrolled % memoryCheckPeriod == 0) {
        val currentSize = vector.estimateSize()
        if (currentSize >= memoryThreshold) {
          val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
          keepUnrolling =
            reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
          if (keepUnrolling) {
            unrollMemoryUsedByThisBlock += amountToRequest
          }
          memoryThreshold += amountToRequest
        }
      }
      elementsUnrolled += 1
    }

    if (keepUnrolling) {
      val arrayValues = vector.toArray
      vector = null
      val entry =
        new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
      val size = entry.size

      def transferUnrollToStorage(amount: Long): Unit = {
        memoryManager.synchronized {
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
          val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
          assert(success, "transferring unroll memory to storage memory failed")
        }
      }

      val enoughStorageMemory = {
        if (unrollMemoryUsedByThisBlock <= size) {
          val acquiredExtra =
            memoryManager.acquireStorageMemory(
              blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
          if (acquiredExtra) {
            transferUnrollToStorage(unrollMemoryUsedByThisBlock)
          }
          acquiredExtra
        } else {
          val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
          transferUnrollToStorage(size)
          true
        }
      }

      if (enoughStorageMemory) {
        entries.synchronized {
          entries.put(blockId, entry)
        }
        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
        Right(size)
      } else {
        assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
          "released too much unroll memory")
        Left(new PartiallyUnrolledIterator(
          this,
          MemoryMode.ON_HEAP,
          unrollMemoryUsedByThisBlock,
          unrolled = arrayValues.toIterator,
          rest = Iterator.empty))
      }
    } else {
      logUnrollFailureMessage(blockId, vector.estimateSize())
      Left(new PartiallyUnrolledIterator(
        this,
        MemoryMode.ON_HEAP,
        unrollMemoryUsedByThisBlock,
        unrolled = vector.iterator,
        rest = values))
    }
  }

在具体看逻辑之前,先弄明白两个配置项:

  • UNROLL_MEMORY_CHECK_PERIOD,对应参数spark.storage.unrollMemoryCheckPeriod,表示在迭代过程中检查内存是否够用的周期,默认值16,即每16个元素检查一次。
  • UNROLL_MEMORY_GROWTH_FACTOR,对应参数spark.storage.unrollMemoryGrowthFactor,表示申请新的展开内存时扩展的倍数,默认值1.5。

然后就可以具体探究该方法的执行流程了:

  • 调用reserveUnrollMemoryForThisTask(),申请初始的展开内存,并随时记录该块使用了多少展开内存。
  • 循环迭代块的数据,将其放入一个SizeTrackingVector中。该数据结构可以动态估算其中存储的元素的大小,后面会详细分析。
  • 每当到了检查的时机,如果已经展开的数据大小超过了当前的展开内存阈值,就再次调用reserveUnrollMemoryForThisTask()方法,试图申请新的展开内存(注意上面的扩展倍数的用法)。申请到之后,同时更新阈值。
  • 所有数据都展开之后,标志keepUnrolling为真,表示展开成功。将SizeTrackingVector中的数据封装为DeserializedMemoryEntry。
  • 检查申请到的展开内存是否比实际大小还大。如果是,就调用嵌套定义的transferUnrollToStorage()方法(实际又调用了releaseUnrollMemoryForThisTask()方法),释放掉多余的展开内存,并将它们返还给存储内存。
  • 一切成功,将块ID与DeserializedMemoryEntry的映射放入entries,并返回Right。注意这个方法返回值的类型是Either类型,它在Scala中表示不相交的两个结果的集合,即可能返回错误的结果(Left),或者正确的结果(Right)。
  • 如果没有足够的展开内存,或者展开所有数据后keepUnrolling标志为假,都表示这次写入不成功,返回Left,其中又包含PartiallyUnrolledIterator,表示一个没有完全展开的迭代器。

对于这种又臭又长的单个方法,多读几遍自然就能通顺。下面贴出它调用的申请与释放展开内存的方法,与上面的一大坨相比已经是毛毛雨了,不再赘述了。

代码#26.6 - o.a.s.memory.MemoryStore.reserveUnrollMemoryForThisTask()/releaseUnrollMemoryForThisTask()方法

代码语言:javascript
复制
代码语言:javascript
复制
  def reserveUnrollMemoryForThisTask(
      blockId: BlockId,
      memory: Long,
      memoryMode: MemoryMode): Boolean = {
    memoryManager.synchronized {
      val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
      if (success) {
        val taskAttemptId = currentTaskAttemptId()
        val unrollMemoryMap = memoryMode match {
          case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
          case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
        }
        unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
      }
      success
    }
  }

  def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
    val taskAttemptId = currentTaskAttemptId()
    memoryManager.synchronized {
      val unrollMemoryMap = memoryMode match {
        case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
        case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
      }
      if (unrollMemoryMap.contains(taskAttemptId)) {
        val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
        if (memoryToRelease > 0) {
          unrollMemoryMap(taskAttemptId) -= memoryToRelease
          memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
        }
        if (unrollMemoryMap(taskAttemptId) == 0) {
          unrollMemoryMap.remove(taskAttemptId)
        }
      }
    }
  }

读取字节与迭代器化的数据

前者对应的是SerializedMemoryEntry,由getBytes()方法实现。后者对应的是DeserializedMemoryEntry,由getValues()方法实现。

代码#26.7 - o.a.s.memory.MemoryStore.getBytes()/getValues()方法

代码语言:javascript
复制
代码语言:javascript
复制
  def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
      case null => None
      case e: DeserializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getBytes on serialized blocks")
      case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
    }
  }

  def getValues(blockId: BlockId): Option[Iterator[_]] = {
    val entry = entries.synchronized { entries.get(blockId) }
    entry match {
      case null => None
      case e: SerializedMemoryEntry[_] =>
        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
      case DeserializedMemoryEntry(values, _, _) =>
        val x = Some(values)
        x.map(_.iterator)
    }
  }
淘汰缓存块

该方法名为evictBlocksToFreeSpace(),用途为淘汰现有的一些块,以为新的块腾出空间。它在StorageMemoryPool.acquireMemory()方法(代码#23.4)中调用,如果忘记了的话,可以返回去看看。这个方法的代码也比较长,但稍微容易理解一些。

代码#26.8 - o.a.s.memory.MemoryStore.evictBlocksToFreeSpace()方法

代码语言:javascript
复制
代码语言:javascript
复制
  private[spark] def evictBlocksToFreeSpace(
      blockId: Option[BlockId],
      space: Long,
      memoryMode: MemoryMode): Long = {
    assert(space > 0)
    memoryManager.synchronized {
      var freedMemory = 0L
      val rddToAdd = blockId.flatMap(getRddId)
      val selectedBlocks = new ArrayBuffer[BlockId]

      def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
        entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
      }

      entries.synchronized {
        val iterator = entries.entrySet().iterator()
        while (freedMemory < space && iterator.hasNext) {
          val pair = iterator.next()
          val blockId = pair.getKey
          val entry = pair.getValue
          if (blockIsEvictable(blockId, entry)) {
            if (blockInfoManager.lockForWriting(blockId, blocking = false).isDefined) {
              selectedBlocks += blockId
              freedMemory += pair.getValue.size
            }
          }
        }
      }

      def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
        val data = entry match {
          case DeserializedMemoryEntry(values, _, _) => Left(values)
          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
        }
        val newEffectiveStorageLevel =
          blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
        if (newEffectiveStorageLevel.isValid) {
          blockInfoManager.unlock(blockId)
        } else {
          blockInfoManager.removeBlock(blockId)
        }
      }

      if (freedMemory >= space) {
        var lastSuccessfulBlock = -1
        try {
          logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
            s"(${Utils.bytesToString(freedMemory)} bytes)")
          (0 until selectedBlocks.size).foreach { idx =>
            val blockId = selectedBlocks(idx)
            val entry = entries.synchronized {
              entries.get(blockId)
            }

            if (entry != null) {
              dropBlock(blockId, entry)
              afterDropAction(blockId)
            }
            lastSuccessfulBlock = idx
          }
          logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
            s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
          freedMemory
        } finally {
          if (lastSuccessfulBlock != selectedBlocks.size - 1) {
            (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
              val blockId = selectedBlocks(idx)
              blockInfoManager.unlock(blockId)
            }
          }
        }
      } else {
        blockId.foreach { id =>
          logInfo(s"Will not store $id")
        }
        selectedBlocks.foreach { id =>
          blockInfoManager.unlock(id)
        }
        0L
      }
    }
  }

方法参数中的space就表示需要腾出多大的空间。其执行流程如下:

  • 循环遍历entries映射中的块,找出其中能够被淘汰的块。所谓能够被淘汰,是指MemoryMode相同(即堆内对堆内,堆外对堆外,不能交叉),并且块ID对应的块数据不属于RDD。
  • 为这些块加写锁,保证当前正在被读取的块不会被淘汰掉。记录将要被淘汰的块ID。
  • 如果腾出的空间已经达到了目标值,就调用嵌套定义的dropBlock()方法真正地移除这些块,最终仍然调用了BlockManager.dropFromMemory()方法。该方法会产生两种结果:一是块仍然存在,只是StorageLevel发生变化(比如转存到了磁盘),就只需解开它的写锁;二是块被彻底地移除,就得调用BlockInfoManager.remove()方法删掉它。最后将剩余未处理的块解锁。
  • 如果腾出的空间最终仍然不能达到目标值,就不会执行淘汰动作,新的块也不会被存入。

总结

本文首先简要介绍了MemoryEntry的作用,然后详细阅读了MemoryStore的源码,了解了序列化数据和反序列化数据在Spark内存中的读写流程。信息量确实很大,也比较枯燥,但到此为止,我们总算对内存在Spark存储体系中的作用有了较为全面的认识。下一篇文章就会进入磁盘存储的领域。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • MemoryEntry
  • MemoryStore
    • 构造与属性成员
      • 直接写入字节
        • 写入迭代器化的数据
          • 读取字节与迭代器化的数据
            • 淘汰缓存块
            • 总结
            相关产品与服务
            对象存储
            对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档