前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 控制算子源码解析

Spark 控制算子源码解析

作者头像
Tim在路上
发布2022-03-23 14:14:58
3380
发布2022-03-23 14:14:58
举报
文章被收录于专栏:后台技术底层理解

Spark 控制算子源码解析

RDD

  • persist() 算子

使用指定的level来标记RDD进行存储。

代码语言:javascript
复制
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  //TODO: Handle changes of StorageLevel
// 如果该RDD已经有存储level, 同时不允许覆盖,则新设置存储level会报错。
if (storageLevel!= StorageLevel.NONE&& newLevel !=storageLevel&& !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  // 第一次设置,则进行注册
  if (storageLevel== StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
storageLevel= newLevel
  this
}

可以看出注册的就是一个ConcurrentMap

代码语言:javascript
复制
private val referenceBuffer =
    Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)

private[spark] valpersistentRdds= {
  val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
  map.asScala
}

从源码中可以看出,执行persist仅仅是设置了StorageLevel, 同时在sc的Map中注册RDD id, 来标记该RDD的存储等级。所以说persist并不是一个action算子,只有真正执行时存储进行存储。

下面我们来看下源码中是如何调用的:

代码语言:javascript
复制
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: Iterator[T] => U,
    partitions: Seq[Int]): Array[U] = {
  // 在clean 函数时的一个作用就是会将对象按照level序列化写出
  val cleanedFunc = clean(func)
  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
代码语言:javascript
复制
// 验证是否在cleaning 后可序列化
if (checkSerializable) {
ensureSerializable(func)
}
// 执行java的序列化,并将func对象进行写出
override def serialize[T: ClassTag](t: T): ByteBuffer = {
    val bos = new ByteBufferOutputStream()
    val out = serializeStream(bos)
    out.writeObject(t)
    out.close()
    bos.toByteBuffer
  }
// 调用writeObject0
private void writeObject0(Object obj, boolean unshared){
...
} else if (obj instanceof Serializable) {
                writeOrdinaryObject(obj, desc, unshared);
} else {
...
}
// 如果对象开启可外部写
if (desc.isExternalizable() && !desc.isProxy()) {
                writeExternalData((Externalizable) obj);
} else {

// 最终调用StorageLevel, 这里的Toint, 是将用户的选择是否开启内存,磁盘等方式转换为了数字。
class StorageLevel private(...{
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
    out.writeByte(toInt)
    out.writeByte(_replication)
  }
}
// toInt 的实现,相当与使用二进制数的每一位,来表示存储的状态。
def toInt: Int = {
    var ret = 0
    if (_useDisk) {
      ret |= 8
    }
    if (_useMemory) {
      ret |= 4
    }
    if (_useOffHeap) {
      ret |= 2
    }
    if (_deserialized) {
      ret |= 1
    }
    ret
  }

实现存储写出的方法是在StorageLevel对象中进行封装, 实现将标志的记录写入外部存储。最后在ShuffleMapTask的反序列化的时候将其连带RDD进行读出val (rdd, dep) = ser.deserialize,在Worker节点实现写入的时候完成数据的存储设置。

  • cache 算子

使用内存存储作为存储等级,底层是调用了persist() 算子。

代码语言:javascript
复制
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

同上,cache也不是action算子,只有在action的runJob中的clean函数时才会进行写出。cache算子和persist算子是必须进行返回的。使用时需要val rdd1 = rdd.cache()

ReliableCheckpointRDD

  • checkpoint 算子

将此RDD设置为检查点,它将会被保存到检查文件内。

可以通过SparkContext#setCheckpointDir 设置检查点存放的目录,其父RDDs将被删除。强烈建议使用前先将RDD使用persist存储于内存,否则会重新进行计算。

代码语言:javascript
复制
def checkpoint(): Unit = RDDCheckpointData.synchronized {
  // NOTE: we use a global lock here due to complexities downstream with ensuring
  // children RDD partitions point to the correct parent partitions. In the future
  // we should revisit this consideration.
  if (context.checkpointDir.isEmpty) {
    throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
// 创建checkpointData
checkpointData= Some(new ReliableRDDCheckpointData(this))
  }
}

这里仅仅进行了的创建ReliableRDDCheckpointData,并为执行其中的方法,在ReliableRDDCheckpointData中会实现一个doCheckpoint()的方法,接下来会进行介绍如何调用,所以这里也只是创建和标注的作用。

调用是在SparkContext类的runJob方法中的最后,可以看出会调用每一个RDD的doCheckpoint方法,如果前面有创建Checkpoint的实现,默认最后一次判断是否创建checkpointData。

代码语言:javascript
复制
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  ...
  rdd.doCheckpoint()
}
代码语言:javascript
复制
doCheckpointCalled = false;
private[spark] def doCheckpoint(): Unit = {
  RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
    // 第一个一定会进入这个方法的,之后会向前调用每一个RDD, 
    if (!doCheckpointCalled) {
doCheckpointCalled= true
      if (checkpointData.isDefined) {
        if (checkpointAllMarkedAncestors) {
          //TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
// them in parallel.
          // Checkpoint parents first because our lineage will be truncated after we
          // checkpoint ourselves
          dependencies.foreach(_.rdd.doCheckpoint())
        }
checkpointData.get.checkpoint()
      } else {
        dependencies.foreach(_.rdd.doCheckpoint())
      }
    }
  }
}

那么我们来看下checkpointData的实现类ReliableCheckpointRDD 和 ReliableRDDCheckpointData 是如何实现的。

  1. 执行rdd. doCheckpoint() 方法

从上面的源码可以看出,执行doCheckpoint方法的条件是checkpointData.isDefined,checkpointData被定义,而我们在执行checkpoint()方法是,最后的实现就是创建了一个new ReliableRDDCheckpointData(this)对象。所以最终会执行ReliableRDDCheckpointData的doCheckpoint()方法。

代码语言:javascript
复制
protected override def doCheckpoint(): CheckpointRDD[T] = {
// 调用写RDD到checkpoint目录的方法
  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd,cpDir)

  // Optionally clean our checkpoint files if the reference is out of scope
  if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
    rdd.context.cleaner.foreach { cleaner =>
      cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
    }
  }

  logInfo(s"Done checkpointing RDD${rdd.id} to$cpDir, new parent is RDD${newRDD.id}")
  newRDD
}
代码语言:javascript
复制
def writeRDDToCheckpointDirectory[T: ClassTag](
    originalRDD: RDD[T],
    checkpointDir: String,
    blockSize: Int = -1): ReliableCheckpointRDD[T] = {
  val checkpointStartTimeNs = System.nanoTime()

  val sc = originalRDD.sparkContext

  // Create the output path for the checkpoint
  // 获取写出目录
  val checkpointDirPath = new Path(checkpointDir)
  val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
  if (!fs.mkdirs(checkpointDirPath)) {
    throw new SparkException(s"Failed to create checkpoint path$checkpointDirPath")
  }

  // Save to file, and reload it as an RDD
  val broadcastedConf = sc.broadcast(
    new SerializableConfiguration(sc.hadoopConfiguration))
  //TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
// 提交runJob, 可以看出这里相当于重新提交任务,会重新走一遍计算,这也是为什么推荐在checkpoint前进行persist
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
  // 调用writeObject(partitioner)方法将数据写出到目录
  if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
  }

  val checkpointDurationMs =
    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
  logInfo(s"Checkpointing took$checkpointDurationMs ms.")
  // 最终执行创建ReliableCheckpointRDD
  val newRDD = new ReliableCheckpointRDD[T](
    sc, checkpointDirPath.toString, originalRDD.partitioner)
  if (newRDD.partitions.length != originalRDD.partitions.length) {
    throw new SparkException(
      "Checkpoint RDD has a different number of partitions from original RDD. Original " +
        s"RDD [ID:${originalRDD.id}, num of partitions:${originalRDD.partitions.length}]; " +
        s"Checkpoint RDD [ID:${newRDD.id}, num of partitions: " +
        s"${newRDD.partitions.length}].")
  }
  newRDD
}

那么数据是如何从checkpoint目录中读出的?这涉及到了ReliableCheckpointRDD。

  1. 获取分区
代码语言:javascript
复制
protected override def getPartitions: Array[Partition] = {
  // listStatus can throw exception if path does not exist.
  val inputFiles =fs.listStatus(cpath)
    .map(_.getPath)
    .filter(_.getName.startsWith("part-"))
    .sortBy(_.getName.stripPrefix("part-").toInt)
  // Fail fast if input files are invalid
  inputFiles.zipWithIndex.foreach { case (path, i) =>
    if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
      throw new SparkException(s"Invalid checkpoint file:$path")
    }
  }
  Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i))
}

遍历分区文件夹,并对文件名进行排序。

  1. 计算compute
代码语言:javascript
复制
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
  val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
  ReliableCheckpointRDD.readCheckpointFile(file,broadcastedConf, context)
}

执行readCheckpointFile函数,通过指定文件夹,和读取的缓存大小进行读取。

最后,checkpoint是新提交一个job进行重新执行,和原任务没有依赖关系,所以调用checkpoint也不需要进行返回一个新的RDD。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022.02.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark 控制算子源码解析
    • RDD
      • ReliableCheckpointRDD
      相关产品与服务
      对象存储
      对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档