前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划18 | 与RDD的重逢

Spark Core源码精读计划18 | 与RDD的重逢

作者头像
大数据真好玩
发布2019-08-19 23:26:37
6940
发布2019-08-19 23:26:37
举报
文章被收录于专栏:暴走大数据暴走大数据

前言

在前面的17篇文章中,我们对以SparkContext和SparkEnv为中心展开的Spark Core底层支撑组件有了比较深入的理解,当然有一些重要的组件,会随着整个系列的进行详细讲解到。按照计划,我们本应开始看Spark的存储系统结构,但是不着急,我们先花2~3篇文章的时间来重新认识一下我们的老朋友——RDD。它不仅与存储息息相关,也是Spark任务调度和计算的主要对象,现在打好基础是非常有益的。

RDD的正式名称为弹性分布式数据集(Resilient Distributed Dataset),Spark官方文档中对它的定义是:可以并行操作的、容错的元素集合。实际上,除了可并行操作、容错两点之外,RDD还具有一些其他相关的特点,如:

  • 不可变性(只能生成或转换,不能直接修改,容错时可以重算);
  • 分区性(内部数据会划分为Partition,是分布式并行的基础);
  • 名称中的“弹性”(可以灵活利用内存和外存,Spark设计思想的体现)。

RDD在Spark Core源码中的基础是o.a.s.rdd.RDD这个抽象类,本文就来对它做一些基础的了解。

RDD抽象类概述

构造方法与成员属性

代码#18.1 - o.a.s.rdd.RDD类的构造方法与成员属性

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging{
  if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
    logWarning("Spark does not support nested RDDs (see SPARK-5063)")
  }

  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

  @transient val partitioner: Option[Partitioner] = None

  val id: Int = sc.newRddId()
  @transient var name: String = _

  private var storageLevel: StorageLevel = StorageLevel.NONE
  private var dependencies_ : Seq[Dependency[_]] = _
  @transient private var partitions_ : Array[Partition] = _

  @transient private[spark] val creationSite = sc.getCallSite()

  @transient private[spark] val scope: Option[RDDOperationScope] = {
    Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
  }

  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
  private val checkpointAllMarkedAncestors =
    Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
  @transient private var doCheckpointCalled = false

  // ...
}

RDD类接收两个主构造方法参数:

  • _sc:即SparkContext实例,它不会被序列化。
  • deps:Dependency的序列,它也不会被序列化。所谓Dependency,就是指当前RDD对其他RDD的依赖关系,后面会讲到Dependency相关的知识。

在构造方法中会检查RDD是否被嵌套了,Spark不支持RDD嵌套,会打印警告信息。另外,还有一个辅助构造方法,它只接收一个RDD oneParent作为参数,此时会使用oneParent对应的SparkContext和一对一依赖OneToOneDependency来构造RDD。

RDD类的主要成员属性如下。

  • partitioner:键值型RDD(即RDD[(K,V)])的分区逻辑,是Partitioner的子类,后面也会讲到与Partitioner相关的细节。
  • id:该RDD的ID,可以调用SparkContext.newRddId()方法产生。
  • name:RDD的可读名称。
  • storageLevel:RDD的持久化等级,一共有12个等级。它由StorageLevel类及其伴生对象定义。
  • dependencies_:RDD的依赖,与构造参数deps相同,但是可以序列化,并且会考虑当前RDD是否被Checkpoint。
  • partitions_:包含RDD的所有分区的数组。
  • creationSite:创建这个RDD的调用代码位置,通过SparkContext.getCallSite()方法获得。关于CallSite的简介可以参见文章#3。
  • scope:RDD的操作域,由RDDOperationScope结构来描述。所谓操作域,其实就是一个确定的产生RDD的代码块,该代码块中的所有RDD就是在相同的操作域中。
  • checkpointData:保存的RDD检查点数据,方便出错时重算。
  • checkpointAllMarkedAncestors:布尔值,表示是否要对当前RDD的所有标记需要Checkpoint的父RDD保存检查点。
  • doCheckpointCalled:布尔值,表示是否已经保存过该RDD的检查点,防止重复保存。
需要RDD子类实现的方法

RDD类中醒目地标出了4个抽象方法,它们都很重要,RDD的子类必须要提供具体实现,如下所示。

代码#18.2 - o.a.s.rdd.RDD类中的抽象方法

  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

  protected def getPartitions: Array[Partition]

  protected def getDependencies: Seq[Dependency[_]] = deps

  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  • compute():计算RDD的一个分区split内的数据,返回对应数据类型的迭代器。
  • getPartitions():取得RDD所有分区的数组。
  • getDependencies():取得RDD的所有依赖,默认返回的就是deps。
  • getPreferredLocations():取得计算分区split的偏好位置(如HDFS上块的位置)数组,这个是可选的。

RDD类中对Partition、Dependency和Preferred Location都提供了简单的Getter方法,它们都会先检查当前RDD的检查点,然后调用上面的三个抽象方法,其代码如下所示。

代码#18.3 - o.a.s.rdd.RDD.partitions()/dependencies()/preferredLocations()方法

  final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies
      }
      dependencies_
    }
  }

  final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
        partitions_.zipWithIndex.foreach { case (partition, index) =>
          require(partition.index == index,
            s"partitions($index).partition == ${partition.index}, but it should equal $index")
        }
      }
      partitions_
    }
  }

  final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
      getPreferredLocations(split)
    }
  }
RDD的五要素

通过以上的介绍,我们就可以归纳出RDD的五个组成要素了。这些内容在RDD类的ScalaDoc中其实已经有所体现:

  • 分区列表 [A list of partitions];
  • 计算每个分区的函数 [A function for computing each split];
  • 对其他RDD的依赖的列表 [A list of dependencies on other RDDs];
  • 可选的对键值型RDD的分区逻辑 [Optionally, a Partitioner for key-value RDDs];
  • 可选的计算分区的位置偏好列表 [Optionally, a list of preferred locations to compute each split on]。

RDD继承体系与算子概述

RDD的子类

RDD拥有众多的子类,这些子类都实现了上面的4个方法。大多数对RDD的操作方法(也就是算子)返回的结果都是RDD子类的实例。主要的RDD子类如下图所示,没有箭头,看官将就一下吧。

图#18.1 - RDD继承体系

由于我们之后还有很多事情要做,不可能将RDD的所有细节都分析一遍,这里暂时就不展开讲每个RDD子类的实现了。

我们已经知道,RDD的算子有两类,即转换(Transformation)算子与动作(Action)算子,这是老生常谈了。

转换算子

转换算子用于对一个RDD施加一系列逻辑,使之变成另一个RDD。在文章#0的WordCount程序中出现的flatMap()、map()、reduceByKey()都是转换算子。作为示例,我们来看看日常工作中极其常见、并且效率较高的mapPartitions()算子。

代码#18.4 - o.a.s.rdd.RDD.mapPartitions()方法

  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

这个算子对RDD[T]每个分区的迭代器施加函数f的转换逻辑,返回一个MapPartitionsRDD[U],参数preservesPartitioning表示是否保留父RDD的分区。MapPartitionsRDD的具体实现如下。可以发现,getPartitions()和partitioner都直接复用了父RDD的,而compute()方法则是直接应用函数f的逻辑。

代码#18.5 - o.a.s.rdd.MapPartitionsRDD类

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def clearDependencies() {
    super.clearDependencies()
    prev = null
  }

  // ...
}

再举个例子,coalesce()算子可以将一个RDD重新分区,也是常用的转换算子之一。它最终会产生CoalescedRDD,如果中途发生Shuffle的话,也有可能会产生ShuffledRDD.

动作算子

动作算子用于触发Job的提交,真正执行RDD转换逻辑的计算,并返回其处理结果。以代码#0.1中用到的collect()以及常用的foreach()为例。

代码#18.6 - o.a.s.rdd.RDD.collect()/foreach()方法

  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

代码很简单,需要注意,它们都调用了SparkContext.runJob()方法来提交一个Job。这个方法比较重要,待到之后研究Spark Core调度逻辑时,它可以称得上是一切的起点。

总结

本文通过阅读与RDD类相关的一些基础源码,复习了RDD的基本知识,另外又对RDD的子类与算子有了大致的了解。下一篇文章会专注于两个要点:Dependency与Partitioner,即RDD的依赖与分区逻辑。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • RDD抽象类概述
    • 构造方法与成员属性
      • 需要RDD子类实现的方法
        • RDD的五要素
        • RDD继承体系与算子概述
          • RDD的子类
            • 转换算子
              • 动作算子
              • 总结
              相关产品与服务
              文件存储
              文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档