首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划20 | RDD检查点的具体实现

Spark Core源码精读计划20 | RDD检查点的具体实现

作者头像
大数据真好玩
发布2019-08-21 15:29:41
5910
发布2019-08-21 15:29:41
举报
文章被收录于专栏:暴走大数据暴走大数据

目录

  • 前言
  • RDD类中的检查点方法
  • 检查点数据的包装
    • RDDCheckpointData
    • ReliableRDDCheckpointData
  • 检查点RDD
    • CheckpointRDD
    • ReliableCheckpointRDD
  • 总结

前言

RDD检查点(Checkpoint)是Spark Core计算过程中的容错机制。通过将RDD的数据与状态持久化,一旦计算过程出错,就可以从之前的状态直接恢复现场,而不必从头重算,大大提高了效率与可靠性。本文从之前已经研究过的RDD类入手,探索一下检查点的具体实现。

RDD类中的检查点方法

在RDD类中,对外提供了两个方法可以将RDD做Checkpoint,分别为checkpoint()方法和localCheckpoint()方法。还有一个对内的doCheckpoint()方法,它在调度模块中提交Job时使用,并且可以递归地对父RDD做Checkpoint,这里暂时不提。

代码#20.1 - o.a.s.rdd.RDD.checkpoint()/localCheckpoint()方法

  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }

  def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
    if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
        conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
      logWarning(/*本地检查点不适用于Executor动态分配的情况...*/)
    }

    if (storageLevel == StorageLevel.NONE) {
      persist(LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL)
    } else {
      persist(LocalRDDCheckpointData.transformStorageLevel(storageLevel), allowOverride = true)
    }

    if (isCheckpointedAndMaterialized) {
      logWarning("Not marking RDD for local checkpoint because it was already " +
        "checkpointed and materialized")
    } else {
      checkpointData match {
        case Some(_: ReliableRDDCheckpointData[_]) => logWarning(
          "RDD was already marked for reliable checkpointing: overriding with local checkpoint.")
        case _ =>
      }
      checkpointData = Some(new LocalRDDCheckpointData(this))
    }
    this
  }

这两个方法最终都是将RDD的checkpointData属性赋值,对应的是检查点数据抽象类RDDCheckpointData的两种实现:ReliableRDDCheckpointData与LocalRDDCheckpointData。

它们两个的区别正如名称的区别:ReliableRDDCheckpointData是将检查点数据保存在可靠的外部存储(HDFS)的文件中,需要重算时从文件读取数据。LocalRDDCheckpointData则将其保存在Executor节点本地,默认存储等级DEFAULT_STORAGE_LEVEL是StorageLevel.MEMORY_AND_DISK,也就是保存在内存与磁盘上。很显然,LocalRDDCheckpointData不如ReliableRDDCheckpointData可靠,一旦Executor失败,检查点数据就会丢失。但它相当于牺牲了可靠性换来了速度,在那些RDD Lineage过长的场景很有效。

在本文中,我们研究的主要对象是ReliableRDDCheckpointData。需要注意的是,必须先设定Checkpoint目录(通过调用SparkContext.setCheckpointDir()方法)才能启用可靠的检查点。

检查点数据的包装

在看ReliableRDDCheckpointData之前,我们先来看看它的父类RDDCheckpointData。

RDDCheckpointData

代码#20.2 - o.a.s.rdd.RDDCheckpointData抽象类

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {
  import CheckpointState._

  protected var cpState = Initialized
  private var cpRDD: Option[CheckpointRDD[T]] = None

  def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
    cpState == Checkpointed
  }

  final def checkpoint(): Unit = {
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

    val newRDD = doCheckpoint()

    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

  protected def doCheckpoint(): CheckpointRDD[T]

  def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData.synchronized { cpRDD }

  def getPartitions: Array[Partition] = RDDCheckpointData.synchronized {
    cpRDD.map(_.partitions).getOrElse { Array.empty }
  }
}

RDDCheckpointData类的构造参数rdd表示当前检查点数据与该RDD相关。cpRDD则表示一个CheckpointRDD实例,它是一个特殊的RDD实现,用于保存检查点,以及从检查点数据恢复现场。cpState是当前检查点进行的状态,由CheckpointState对象定义,实际上是个枚举,分为三个阶段:初始化、正在Checkpoint、Checkpoint完成。

代码#20.3 - o.a.s.rdd.CheckpointState对象

private[spark] object CheckpointState extends Enumeration {
  type CheckpointState = Value
  val Initialized, CheckpointingInProgress, Checkpointed = Value
}

checkpoint()方法包含了保存检查点的逻辑,注意它由final关键词修饰,子类不可以覆写。它的执行流程是:在检查点状态是Initialized的情况下,将其置为CheckpointingInProgress,然后调用doCheckpoint()方法生成CheckpointRDD。注意doCheckpoint()是个抽象方法,由ReliableRDDCheckpointData与LocalRDDCheckpointData分别实现。最后将生成的CheckpointRDD赋值给cpRDD,将状态置为Checkpointed,并调用RDD.markCheckpointed()方法标记检查点已经保存完毕。

markCheckpointed()方法的源码如下。

代码#20.4 - o.a.s.rdd.RDD.markCheckpointed()方法

  private[spark] def markCheckpointed(): Unit = {
    clearDependencies()
    partitions_ = null
    deps = null
  }

  protected def clearDependencies(): Unit = {
    dependencies_ = null
  }

可见是将RDD原先持有的分区和依赖信息清除了。很显然,这些东西都已经保存在了检查点里,不需要再保留一份。下面来读读ReliableRDDCheckpointData是如何实现的。

ReliableRDDCheckpointData

ReliableRDDCheckpointData类没有很特殊的逻辑,下面是doCheckpoint()方法的实现。

代码#20.5 - o.a.s.rdd.ReliableRDDCheckpointData.doCheckpoint()方法

  protected override def doCheckpoint(): CheckpointRDD[T] = {
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
    newRDD
  }

可见,CheckpointRDD是通过调用ReliableCheckpointRDD.writeRDDToCheckpointDirectory()方法生成的。另外,在其伴生对象中还提供了两个方法,分别用来返回RDD检查点的路径,以及删除检查点数据。

代码#20.6 - o.a.s.rdd.ReliableRDDCheckpointData.checkpointPath()/cleanCheckpoint()方法

  def checkpointPath(sc: SparkContext, rddId: Int): Option[Path] = {
    sc.checkpointDir.map { dir => new Path(dir, s"rdd-$rddId") }
  }

  def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
    checkpointPath(sc, rddId).foreach { path =>
      path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
    }
  }

然后来看CheckpointRDD的相关细节,通过它,我们就可以真正地创建检查点,以及从检查点数据恢复现场了。

检查点RDD

CheckpointRDD

CheckpointRDD实际上也是个抽象类,继承自RDD。

代码#20.7 - o.a.s.rdd.CheckpointRDD抽象类

private[spark] abstract class CheckpointRDD[T: ClassTag](sc: SparkContext)
  extends RDD[T](sc, Nil) {
  override def doCheckpoint(): Unit = { }
  override def checkpoint(): Unit = { }
  override def localCheckpoint(): this.type = this

  // scalastyle:off
  protected override def getPartitions: Array[Partition] = ???
  override def compute(p: Partition, tc: TaskContext): Iterator[T] = ???
  // scalastyle:on
}

可见,它将RDD类中doCheckpoint()、checkpoint()和localCheckpoint()三个方法都覆写成了空的,因为CheckpointRDD本身并不需要再次被Checkpoint。另外它也覆写了文章#18中提到的getPartitions()和compute()方法,看官可能对三个问号比较好奇,实际上它是在scala.Predef中定义的:

def ??? : Nothing = throw new NotImplementedError

相当于没有实现,而把具体工作下放给子类去做。要使用???,也必须像上面代码一样,用scalastyle:off关闭静态检查。

普通RDD的compute()方法用于计算分区数据,在CheckpointRDD中,它的作用就是从检查点恢复数据了。如同RDDCheckpointData一样,CheckpointRDD也有两个子类,即ReliableCheckpointRDD和LocalCheckpointRDD。下面来看ReliableCheckpointRDD。

ReliableCheckpointRDD

ReliableCheckpointRDD是一个相对复杂的实现,并且其大多数方法都在伴生对象中。我们就不按部就班地阅读代码了,而直接从代码#20.5中调用的 writeRDDToCheckpointDirectory()方法入手,看看检查点数据是如何写入的。

代码#20.8 - o.a.s.rdd.ReliableCheckpointRDD.writeRDDToCheckpointDirectory()方法

  def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    val sc = originalRDD.sparkContext

    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }

    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    val checkpointDurationMs =
      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
    logInfo(s"Checkpointing took $checkpointDurationMs ms.")

    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
          s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
          s"${newRDD.partitions.length}].")
    }
    newRDD
  }

该方法的执行流程是:调用HDFS相关的API创建检查点的目录,然后调用SparkContext.runJob()方法起一个Job,该Job执行writePartitionToCheckpointFile()方法的逻辑,将RDD的分区数据写入检查点目录。再检查原RDD是否定义了分区器,如有,就调用writePartitionerToCheckpointDir()方法将分区器的逻辑写入检查点目录。最后创建ReliableCheckpointRDD实例,并检查它的分区数是否与原RDD的分区数相同,相同则成功返回。

上面涉及到的两个写入方法代码比较多,但是理解起来很容易,故不再贴出来。那么如何读取检查点的数据呢?来看compute()方法的实现。

代码#20.9 - o.a.s.rdd.ReliableCheckpointRDD.compute()方法

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }

可见是调用了readCheckpointFile()方法,其代码如下。

代码#20.10 - o.a.s.rdd.ReliableCheckpointRDD.compute()方法

  def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    val fileInputStream = {
      val fileStream = fs.open(path, bufferSize)
      if (env.conf.get(CHECKPOINT_COMPRESS)) {
        CompressionCodec.createCodec(env.conf).compressedInputStream(fileStream)
      } else {
        fileStream
      }
    }
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    context.addTaskCompletionListener(context => deserializeStream.close())
    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }

该方法仍然使用HDFS API打开检查点目录下的文件,并用SparkEnv中初始化的JavaSerializer反序列化,最终返回数据的迭代器,整个现场就恢复了。

总结

本文研究了与Spark RDD检查点相关的重要组件——RDDCheckpointData和CheckpointRDD,并且以可靠版本的实现——ReliableRDDCheckpointData和ReliableCheckpointRDD为例,详细解析了检查点数据从写入到读取的整个流程。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • RDD类中的检查点方法
  • 检查点数据的包装
    • ReliableRDDCheckpointData
    • 检查点RDD
      • CheckpointRDD
        • ReliableCheckpointRDD
        • 总结
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档