前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划21 | Spark Block的基本实现

Spark Core源码精读计划21 | Spark Block的基本实现

作者头像
大数据真好玩
发布2019-08-21 15:28:58
6270
发布2019-08-21 15:28:58
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

  • 前言
  • 块ID:BlockId
  • 块数据:BlockData
    • BlockData特征
    • ByteBufferBlockData
    • ChunkedByteBuffer简介
  • 块元信息:BlockInfo
  • 总结

前言

前面我们用3篇文章的时间讲解了RDD的基础知识,包括其五要素、算子、依赖、分区以及检查点。实际上,与RDD相关的细节还有很多,渗透在之后的研究过程中。在时机合适时,会再拨出专门的时间更深入地讲解RDD。从本篇开始,进入Spark Core存储子系统。

提起“存储”这个词,自然就包括内部存储(内存)与外部存储(磁盘等)。Spark的存储子系统会同时对内存和外存进行管理,这些管理组件的名称本身就很容易理解,如MemoryManager、DiskBlockManager、MemoryStore、DiskStore等,我们会逐渐接触到它们。

前文已经多次提到过,Spark存储子系统的“司令官”是BlockManager,即块管理器,用主从架构实现。由此可见,“块”(Block)是Spark存储的基本单位,看官如果学过操作系统理论,对这个词应该已经非常熟悉了。不过这里的块与操作系统和JVM都无关,只是Spark体系内的概念而已。

本文先来探索与块相关的基本实现,包括块的ID、实际数据与元信息的封装。

块ID:BlockId

与RDD类似,块也需要一个ID来表明它的身份。不过RDD的ID只是一个整形值而已,块ID包含的东西稍微多点。BlockId抽象类的定义如下。

代码#21.1 - o.a.s.storage.BlockId抽象类

代码语言:javascript
复制
sealed abstract class BlockId {
  def name: String

  def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
  def isRDD: Boolean = isInstanceOf[RDDBlockId]
  def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
  def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

  override def toString: String = name
}

这个类本身已经很清楚了,name方法返回该BlockId的唯一名称。三个以is为前缀的布尔方法分别判断当前BlockId是否为RDDBlockId、ShuffleBlockId和BroadcastBlockId。这三个实现类(当然还有一些其他的实现)都是BlockId的子类,以下是类图。

图#21.1 - BlockId的子类

代码示例如下。

代码#21.2 - o.a.s.storage.RDDBlockId/ShuffleBlockId/BroadcastBlockId类

代码语言:javascript
复制
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
  override def name: String = "rdd_" + rddId + "_" + splitIndex
}

@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

可见,它们都是简单的样例类,覆写了name方法,并且命名都符合一定的规则。比如RDD数据块ID的命名为rdd_[RDD ID]_[分区号],Shuffle数据块ID的命名为shuffle_[Shuffle过程ID]_[Map任务ID]_[Reduce任务ID]等。在BlockId类的伴生对象中,也有所有命名的正则表示。

代码#21.3 - BlockId命名的正则表示

代码语言:javascript
复制
  val RDD = "rdd_([0-9]+)_([0-9]+)".r
  val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
  val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
  val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
  val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
  val TASKRESULT = "taskresult_([0-9]+)".r
  val STREAM = "input-([0-9]+)-([0-9]+)".r
  val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
  val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
  val TEST = "test_(.*)".r

块的ID已经有了,接下来看具体的数据如何封装。

块数据:BlockData

BlockData特征

BlockData是一个松散的Scala特征,其源码如下。

代码#21.4 - o.a.s.storage.BlockData特征

代码语言:javascript
复制
private[spark] trait BlockData {
  def toInputStream(): InputStream
  def toNetty(): Object
  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
  def toByteBuffer(): ByteBuffer

  def size: Long
  def dispose(): Unit
}

其中定义的都是虚方法,它们的含义分别是:

  • toInputStream():将块数据转化为java.io.InputStream。
  • toNetty():将块数据转化为适合Netty上传输的对象格式。
  • toChunkedByteBuffer():将块数据转化为o.a.s.util.io.ChunkedByteBuffer。ChunkedByteBuffer在文章#11讲解广播变量时已出现过,是对多个java.nio.ByteBuffer的封装,表示多个不连续的内存缓冲区中的数据。虽然Chunk这个词在中文中一般也翻译作“块”,但它与上面的Block相比,更是一个逻辑概念而非物理概念。
  • toByteBuffer():将块数据转化为单个java.nio.ByteBuffer。
  • size():返回这个BlockData的长度。
  • dispose():销毁BlockData。

可见,BlockData只是定义了数据转化的规范,并没有涉及具体的存储格式和读写流程,实现起来比较自由,所以前面说它是个松散的特征。BlockData目前有3个实现类:基于内存和ChunkedByteBuffer的ByteBufferBlockData、基于磁盘和File的DiskBlockData,以及加密的EncryptedBlockData。下面来看看最简单的ByteBufferBlockData实现。

ByteBufferBlockData

以下是ByteBufferBlockData类的源码,可见它是直接代理了ChunkedByteBuffer的各种方法。

代码#21.5 - o.a.s.storage.ByteBufferBlockData类

代码语言:javascript
复制
private[spark] class ByteBufferBlockData(
    val buffer: ChunkedByteBuffer,
    val shouldDispose: Boolean) extends BlockData {
  override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)

  override def toNetty(): Object = buffer.toNetty

  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    buffer.copy(allocator)
  }

  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer

  override def size: Long = buffer.size

  override def dispose(): Unit = {
    if (shouldDispose) {
      buffer.dispose()
    }
  }
}

ChunkedByteBuffer实际上就是定义了对Array[ByteBuffer]类型的各种操作,它在Spark存储中是个很常用的类,下面来看一下。

ChunkedByteBuffer简介

ChunkedByteBuffer的构造方法参数是一个名为chunks的Array[ByteBuffer]类型对象,也就是说一个ByteBuffer就是一个Chunk。该类的成员属性如下。

代码#21.6 - o.a.s.util.io.ChunkedByteBuffer类的属性成员

代码语言:javascript
复制
  private val bufferWriteChunkSize =
    Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
      .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt

  private[this] var disposed: Boolean = false

  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
  • bufferWriteChunkSize:将缓存数据写出时的Chunk大小,由spark.buffer.write.chunkSize配置项来确定,默认值64MB。
  • disposed:该ChunkedByteBuffer是否已销毁。
  • size:该ChunkedByteBuffer的大小,通过调用ByteBuffer.limit()方法获取每个Chunk的大小并累加而来。

它提供了一个writeFully()方法,用来将缓存块数据以bufferWriteChunkSize的大小写入NIO Channel。

代码#21.7 - o.a.s.util.io.ChunkedByteBuffer.writeFully()方法

代码语言:javascript
复制
  def writeFully(channel: WritableByteChannel): Unit = {
    for (bytes <- getChunks()) {
      val curChunkLimit = bytes.limit()
      while (bytes.hasRemaining) {
        try {
          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
          bytes.limit(bytes.position() + ioSize)
          channel.write(bytes)
        } finally {
          bytes.limit(curChunkLimit)
        }
      }
    }
  }

关于它的其他方法,我们会在今后的讲解过程中逐渐接触到,不难。

块元信息:BlockInfo

为了方便跟踪块的一些基本数据,需要用一个专门的数据结构BlockInfo来维护。其完整代码如下所示。

代码#21.8 - o.a.s.storage.BlockInfo类

代码语言:javascript
复制
private[storage] class BlockInfo(
    val level: StorageLevel,
    val classTag: ClassTag[_],
    val tellMaster: Boolean) {
  def size: Long = _size
  def size_=(s: Long): Unit = {
    _size = s
    checkInvariants()
  }
  private[this] var _size: Long = 0

  def readerCount: Int = _readerCount
  def readerCount_=(c: Int): Unit = {
    _readerCount = c
    checkInvariants()
  }
  private[this] var _readerCount: Int = 0

  def writerTask: Long = _writerTask
  def writerTask_=(t: Long): Unit = {
    _writerTask = t
    checkInvariants()
  }
  private[this] var _writerTask: Long = BlockInfo.NO_WRITER

  private def checkInvariants(): Unit = {
    assert(_readerCount >= 0)
    assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
  }

  checkInvariants()
}

该类有三个构造方法参数:

  • level:块的期望存储等级,不代表实际的存储情况。例如,如果设定为StorageLevel.MEMORY_AND_DISK,那么这个块有可能只在内存而不在磁盘中,反之同理。
  • classTag:块的类标签。
  • tellMaster:是否要将该块的元信息告知Master。

BlockInfo内定义了3对Getter/Setter:

  • size:块的大小,以字节为单位。
  • readerCount:该块被读取的次数。因为读取块时需要上锁,因此也就相当于加读锁的次数。
  • writerTask:当前持有该块写锁的Task ID。

虽然上面提到了读锁和写锁,但BlockInfo本身并没有提供任何锁机制,而是藉由BlockInfo的管理器BlockInfoManager来实现。关于BlockInfoManager的细节将在下一篇文章讨论。

总结

本文研究了与块相关的三大基本组件:BlockId、BlockData与BlockInfo,它们三者合起来就可以基本完整地描述Spark中的一个块了。理解了它们,我们就可以继续研究块在内存与外存中是分别如何管理的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 块ID:BlockId
  • 块数据:BlockData
    • BlockData特征
      • ByteBufferBlockData
        • ChunkedByteBuffer简介
        • 块元信息:BlockInfo
        • 总结
        相关产品与服务
        对象存储
        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档