前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划#28:磁盘存储DiskStore

Spark Core源码精读计划#28:磁盘存储DiskStore

作者头像
大数据真好玩
发布2019-08-21 15:31:47
6170
发布2019-08-21 15:31:47
举报
文章被收录于专栏:暴走大数据

目录

  • 前言
  • 磁盘存储DiskStore
    • 构造方法与属性成员
    • 写入块
    • 写入字节
    • 读取字节
  • 磁盘块数据DiskBlockData
    • 转化为ChunkedByteBuffer
    • 转化为ByteBuffer
  • 总结

前言

在上一篇文章中,我们认识了Spark管理磁盘块的组件DiskBlockManager,本文接着来看真正负责磁盘存储的组件DiskStore,以及与它相关的BlockData。这部分内容会涉及到一点与Java NIO相关的东西,看官需要稍微注意一下。

磁盘存储DiskStore

构造方法与属性成员

代码#28.1 - o.a.s.storage.DiskStore类的构造方法与属性成员

代码语言:javascript
复制
代码语言:javascript
复制
private[spark] class DiskStore(
    conf: SparkConf,
    diskManager: DiskBlockManager,
    securityManager: SecurityManager) extends Logging {

  private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
  private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
    Int.MaxValue.toString)
  private val blockSizes = new ConcurrentHashMap[BlockId, Long]() 
}

DiskStore接受3个构造方法参数,分别是SparkConf、DiskBlockManager和SecurityManager的实例,其中SecurityManager用于提供对数据加密的支持。3个属性字段的含义如下:

  • minMemoryMapBytes:使用内存映射(memory map)读取文件的最小阈值,由配置项spark.storage.memoryMapThreshold指定,默认值2M。当磁盘中的文件大小超过该值时,就不会直接读取,而用内存映射文件来读取,提高效率。
  • maxMemoryMapBytes:使用内存映射读取文件的最大阈值,由配置项spark.storage.memoryMapLimitForTests指定。它是个测试参数,默认值为不限制。
  • blockSizes:维护块ID与其对应大小之间的映射关系的ConcurrentHashMap。
写入块

写入块的逻辑由put()方法来实现。

代码#28.2 - o.a.s.storage.DiskStore.put()/contains()方法

代码语言:javascript
复制
  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
    if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
    }
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val out = new CountingWritableChannel(openForWrite(file))
    var threwException: Boolean = true
    try {
      writeFunc(out)
      blockSizes.put(blockId, out.getCount)
      threwException = false
    } finally {
      try {
        out.close()
      } catch {
        case ioe: IOException =>
          if (!threwException) {
            threwException = true
            throw ioe
          }
      } finally {
         if (threwException) {
          remove(blockId)
        }
      }
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
  }

  def contains(blockId: BlockId): Boolean = {
    val file = diskManager.getFile(blockId.name)
    file.exists()
  }

put()方法首先调用contains()方法检查块是否已经以文件的形式写入了,只有没有写入才会继续操作。然后,调用DiskBlockManager.getFile()方法打开块ID对应的文件,然后获取该文件的WritableByteChannel(NIO中的写通道,表示可以通过调用write()方法向文件写入数据)。最后,调用参数中传入的writeFunc函数,操作WritableByteChannel将数据写入,并将块ID与其对应的字节数加入blockSizes映射。

接下来看一看代码#28.2中调用的openForWrite()方法。

代码#28.3 - o.a.s.storage.DiskStore.openForWrite()方法

代码语言:javascript
复制
  private def openForWrite(file: File): WritableByteChannel = {
    val out = new FileOutputStream(file).getChannel()
    try {
      securityManager.getIOEncryptionKey().map { key =>
        CryptoStreamUtils.createWritableChannel(out, conf, key)
      }.getOrElse(out)
    } catch {
      case e: Exception =>
        Closeables.close(out, true)
        file.delete()
        throw e
    }
  }

可见,该方法就是通过文件对象构造了文件输出流FileOutputStream,然后获取它对应的Channel对象用于写数据。特别地,如果I/O需要加密,就需要另外调用CryptoStreamUtils.createWritableChannel()方法包装,本文就不涉及了。至于CountingWritableChannel,也只是基于WritableByteChannel接口扩展出来的一个简单类,增加了统计字节数的方法,代码也就不再列出。

写入字节

代码#28.4 - o.a.s.storage.DiskStore.putBytes()方法 def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {

代码语言:javascript
复制
    put(blockId) { channel =>
      bytes.writeFully(channel)
    }
  }
代码语言:javascript
复制
可见,该方法除了块ID外,还需要传入封装在ChunkedByteBuffer中的数据。调用上述put()方法时,传入的writeFunc函数调用了ChunkedByteBuffer.writeFully()方法,负责将数据以一定的Chunk大小分块写入WritableByteChannel。
读取字节

代码#28.5 - o.a.s.storage.DiskStore.getBytes()方法

代码语言:javascript
复制
  def getBytes(blockId: BlockId): BlockData = {
    val file = diskManager.getFile(blockId.name)
    val blockSize = getSize(blockId)

    securityManager.getIOEncryptionKey() match {
      case Some(key) =>
        new EncryptedBlockData(file, blockSize, conf, key)
      case _ =>
        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
    }
  }

这段代码很简单,但可以注意到,在加密环境下和非加密环境下返回的结果是不同的,前者是EncryptedBlockData对象,后者是DiskBlockData对象,而它们都是BlockData的子类。顾名思义,BlockData就是对磁盘块数据的具体封装,下面选择最常见的DiskBlockData来看一看。

磁盘块数据DiskBlockData

这个类是定义在DiskStore下方的私有类,比较短,因此直接全贴在下面。

代码#28.6 - o.a.s.storage.DiskBlockData类

代码语言:javascript
复制
private class DiskBlockData(
    minMemoryMapBytes: Long,
    maxMemoryMapBytes: Long,
    file: File,
    blockSize: Long) extends BlockData {
  override def toInputStream(): InputStream = new FileInputStream(file)

  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)

  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
    Utils.tryWithResource(open()) { channel =>
      var remaining = blockSize
      val chunks = new ListBuffer[ByteBuffer]()
      while (remaining > 0) {
        val chunkSize = math.min(remaining, maxMemoryMapBytes)
        val chunk = allocator(chunkSize.toInt)
        remaining -= chunkSize
        JavaUtils.readFully(channel, chunk)
        chunk.flip()
        chunks += chunk
      }
      new ChunkedByteBuffer(chunks.toArray)
    }
  }

  override def toByteBuffer(): ByteBuffer = {
    require(blockSize < maxMemoryMapBytes,
      s"can't create a byte buffer of size $blockSize" +
      s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
    Utils.tryWithResource(open()) { channel =>
      if (blockSize < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(blockSize.toInt)
        JavaUtils.readFully(channel, buf)
        buf.flip()
        buf
      } else {
        channel.map(MapMode.READ_ONLY, 0, file.length)
      }
    }
  }

  override def size: Long = blockSize

  override def dispose(): Unit = {}

  private def open() = new FileInputStream(file).getChannel
}

很久之前也已经大概说过,BlockData特征只是定义了块数据的转化方式,具体的细节则留给各个实现类。我们具体看看toChunkedByteBuffer()和toByteBuffer()这两个方法。

转化为ChunkedByteBuffer

Utils.tryWithResource()方法实际上就是Java中try-with-resources的Scala实现,因为Scala中并没有这个语法糖。

toChunkedByteBuffer()方法会将文件转化为输入流FileInputStream,并获取其ReadableFileChannel,再调用JavaUtils.readFully()方法将从Channel中取得的数据填充到ByteBuffer中。每个ByteBuffer即为一个Chunk,所有Chunk的数组形成最终的ChunkedByteBuffer。关于ChunkedByteBuffer在文章#21简要提到过,之后会很快写一篇番外文章专门讲解它,因为有点意思。

转化为ByteBuffer

toByteBuffer()方法会检查块大小是否小于spark.storage.memoryMapThreshold(终于出现了)。如果小于的话,就会采用与toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之,就调用ReadableFileChannel.map()方法将数据映射到MappedByteBuffer中,即进程的虚拟内存中。不过,考虑到内存映射的应用场景的话,2MB的阈值可能有点小(保守)了,一点碎碎念,请勿在意。

总结

本文研究了Spark磁盘存储类DiskStore的具体实现,主要是写入块/字节以及读取字节的方法。另外,DiskStore读取的字节会用BlockData来封装,因此也顺便了解了一下DiskBlockData的一点细节。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 磁盘存储DiskStore
    • 构造方法与属性成员
      • 写入块
        • 写入字节
          • 读取字节
          • 磁盘块数据DiskBlockData
            • 转化为ChunkedByteBuffer
              • 转化为ByteBuffer
              • 总结
              相关产品与服务
              对象存储
              对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档