前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《深入理解Spark-核心思想与源码分析》读书笔记(2)

《深入理解Spark-核心思想与源码分析》读书笔记(2)

作者头像
用户1148523
发布2018-01-09 10:31:40
9350
发布2018-01-09 10:31:40
举报
文章被收录于专栏:FishFish

第四章 存储体系

这章主要讲的就是如何存储,包括存内存,存硬盘,还有存Tachyon

Spark存储体系架构
Spark存储体系架构

这个图写画得灰常好,下面是对其中序号的解释。 1)表示的是Executor的BlockManager与Driver的BlockManager进行消息通讯,比如注册BlockManager啊、更新BlockManager之类的。 2)表示的是对BlockManager的读操作和写操作 3)表示当MemoryStore的内存不足时,写入DiskStore。而DiskStore的是由DiskStoreManager管理的 4)表示访问远端节点的Executor的TransportServer进行Block的上传或者下载 5)为远端Executor提供Block的上传或者下载服务 6)表示当前存储体系选择Tachyon存储,对于BlockManager的读写设计上调用的是Tachyon的putBytes、putArray、getBytes、getValue等等。 后面的内容就是讲这些。主要是因为网上没找到又懒得画,so~~~

块管理器BlockManager的实现

如结构图所示,BlockManager是其他所有BlockManager的抽象类(父类?接口?原谅我JAVA学得不好)。BlockManager主要实现代码如下

代码语言:javascript
复制
  val diskBlockManager = new DiskBlockManager(this, conf)

  private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

  private val futureExecutionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

  // Actual storage of where blocks are kept
  private var externalBlockStoreInitialized = false
  private[spark] val memoryStore = new MemoryStore(this, memoryManager)
  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
  private[spark] lazy val externalBlockStore: ExternalBlockStore = {
    externalBlockStoreInitialized = true
    new ExternalBlockStore(this, executorId)
  }
  memoryManager.setMemoryStore(memoryStore)

  private val maxMemory = memoryManager.maxStorageMemory

  private[spark]
  val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

  private val externalShuffleServicePort = {
    val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
    if (tmpPort == 0) {
      conf.get("spark.shuffle.service.port").toInt
    } else {
      tmpPort
    }
  }

  var blockManagerId: BlockManagerId = _

  private[spark] var shuffleServerId: BlockManagerId = _

  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
    val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
      securityManager.isSaslEncryptionEnabled())
  } else {
    blockTransferService
  }

  // Whether to compress broadcast variables that are stored
  private val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
  // Whether to compress shuffle output that are stored
  private val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
  // Whether to compress RDD partitions that are stored serialized
  private val compressRdds = conf.getBoolean("spark.rdd.compress", false)
  // Whether to compress shuffle output temporarily spilled to disk
  private val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)

  private val slaveEndpoint = rpcEnv.setupEndpoint(
    "BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
    new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
  // Pending re-registration action being executed asynchronously or null if none is pending.
  // Accesses should synchronize on asyncReregisterLock.  // Pending re-registration action being executed asynchronously or null if none is pending.
  // Accesses should synchronize on asyncReregisterLock.
  private var asyncReregisterTask: Future[Unit] = null
  private val asyncReregisterLock = new Object

  private val metadataCleaner = new MetadataCleaner(
    MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
  private val broadcastCleaner = new MetadataCleaner(
    MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)

  // Field related to peer block managers that are necessary for block replication
  @volatile private var cachedPeers: Seq[BlockManagerId] = _
  private val peerFetchLock = new Object
  private var lastPeerFetchTime = 0L
  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)

从代码可以看出BlockManager主要由一下几个部分构成

  • BlockManagerMaster
  • diskBlockManager
  • memoryStore
  • diskStore
  • 非广播Block清理器metadataCleaner和广播Block清理器broadcastCleaner
  • 压缩算法实现compressionCodec 然后BlockManager要生效就要进行初始化
    1. blockTransferService和shuffleClient的初始化
    2. blockManagerId和shuffleServerId的创建。
    3. 向BlockManagerMaster注册这些信息
代码语言:javascript
复制
  def initialize(appId: String): Unit = {
    blockTransferService.init(this)
    shuffleClient.init(appId)

    blockManagerId = BlockManagerId(
      executorId, blockTransferService.hostName, blockTransferService.port)

    shuffleServerId = if (externalShuffleServiceEnabled) {
      logInfo(s"external shuffle service port = $externalShuffleServicePort")
      BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
    } else {
      blockManagerId
    }

    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

    // Register Executors' configuration with the local shuffle service, if one should exist.
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
      registerWithExternalShuffleServer()
    }
  }

ShuffleClient

这个就是上边那个结构图的4和5的那个了。然后要对它进行初始化共分四步。

  1. 创建RPCServer 这个server就是为了给远方的你提供上传和下载的功能
  2. 构造传输上下文TransportContext
代码语言:javascript
复制
  public TransportContext(
      TransportConf conf,
      RpcHandler rpcHandler,
      boolean closeIdleConnections) {
    this.conf = conf;
    this.rpcHandler = rpcHandler;
    this.encoder = new MessageEncoder();
    this.decoder = new MessageDecoder();
    this.closeIdleConnections = closeIdleConnections;
  }

BlockManagerMaster对BlockManager的管理

这里有个BlockManagerMaster对存在于Executor上的BlockManager统一管理,比如更新Executor上的Block之类的。 询问Driver并获取回复方法

代码语言:javascript
复制
  def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    // TODO: Consider removing multiple attempts
    var attempts = 0
    var lastException: Exception = null
    while (attempts < maxRetries) {
      attempts += 1
      try {
        val future = ask[T](message, timeout)
        val result = timeout.awaitResult(future)
        if (result == null) {
          throw new SparkException("RpcEndpoint returned null")
        }
        return result
      } catch {
        case ie: InterruptedException => throw ie
        case e: Exception =>
          lastException = e
          logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
      }

      if (attempts < maxRetries) {
        Thread.sleep(retryWaitMs)
      }
    }

    throw new SparkException(
      s"Error sending message [message = $message]", lastException)
  }

之后向BlockManagerMaster注册BlockManagerId,即Executor或者Driver自身的BlockManager在初始化时,需要向Driver的BlockManager注册BlockManager信息。

磁盘块管理器DiskBlockManager

BlockManager初始化时会创建DiskBlockManager,DiskBlockManager的构造步骤如下。

  1. 调用createLocalDirs方法创建二级目录。
  2. 添加运行环境结束时的钩子,用于进程关闭时创建线程,代码如下。
代码语言:javascript
复制
private def addShutdownHook(): AnyRef = {
    ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
      logInfo("Shutdown hook called")
      DiskBlockManager.this.doStop()
    }
  }

private def doStop(): Unit = {
    // Only perform cleanup if an external service is not serving our shuffle files.
    // Also blockManagerId could be null if block manager is not initialized properly.
    if (!blockManager.externalShuffleServiceEnabled ||
      (blockManager.blockManagerId != null && blockManager.blockManagerId.isDriver)) {
      localDirs.foreach { localDir =>
        if (localDir.isDirectory() && localDir.exists()) {
          try {
            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
              Utils.deleteRecursively(localDir)
            }
          } catch {
            case e: Exception =>
              logError(s"Exception while deleting local spark dir: $localDir", e)
          }
        }
      }
    }
  }

创建临时Block方法createTempShuffleBlock是ShuffleMapTask运行结束需要把中间结果临时保存是调用的,用来创建临时的Block。

磁盘存储DiskStore

当MemoryStore没有足够空间时,就会使用DiskStore将块存入磁盘。DiskStore继承自BlockStore,并实现了getBytes等方法。

内存存储MemoryStore

spark内存管理模型
spark内存管理模型

这里以后会详细解释。

Tachyon存储TachyonStore

Tachyon其实就是一种分布式文件系统。和Spark一样,也使用了Master和Worker的架构。它也实现了BlockStore的get、put方法,然后这些方法又都调用了TachyonStore的方法将数据写入Tachyon的分布式内存中。

TachyonStore框架
TachyonStore框架

块管理器BlockManager

前面已经介绍了BlockManager中的主要组件,现在来看看BlockManager自身的实现。

代码语言:javascript
复制
doPut()方法的代码和解释

备份不太懂 各种获取block的方法的介绍,不要代码

metadataCleaner和broadcastCleaner

介绍下原理不要代码

缓存管理器CacheManager

代码语言:javascript
复制
getOrComput()代码

压缩算法

代码语言:javascript
复制
CompressionCodec代码

磁盘写入实现DiskBlockObjectWriter

描述主要作用即可

块索引shffle管理器 IndexShuffleManager

主要作用即可

shuffle内存管理器 ShffleMemoryManager

主要作用即可

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年02月28日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 第四章 存储体系
    • 块管理器BlockManager的实现
      • ShuffleClient
        • BlockManagerMaster对BlockManager的管理
          • 磁盘块管理器DiskBlockManager
            • 磁盘存储DiskStore
              • 内存存储MemoryStore
                • Tachyon存储TachyonStore
                  • 块管理器BlockManager
                    • metadataCleaner和broadcastCleaner
                      • 缓存管理器CacheManager
                        • 压缩算法
                          • 磁盘写入实现DiskBlockObjectWriter
                            • 块索引shffle管理器 IndexShuffleManager
                              • shuffle内存管理器 ShffleMemoryManager
                              相关产品与服务
                              云 HDFS
                              云 HDFS(Cloud HDFS,CHDFS)为您提供标准 HDFS 访问协议,您无需更改现有代码,即可使用高可用、高可靠、多维度安全、分层命名空间的分布式文件系统。 只需几分钟,您就可以在云端创建和挂载 CHDFS,来实现您大数据存储需求。随着业务需求的变化,您可以实时扩展或缩减存储资源,CHDFS 存储空间无上限,满足您海量大数据存储与分析业务需求。此外,通过 CHDFS,您可以实现计算与存储分离,极大发挥计算资源灵活性,同时实现存储数据永久保存,降低您大数据分析资源成本。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档