专栏首页暴走大数据Spark Core源码精读计划18 | 与RDD的重逢

Spark Core源码精读计划18 | 与RDD的重逢

前言

在前面的17篇文章中,我们对以SparkContext和SparkEnv为中心展开的Spark Core底层支撑组件有了比较深入的理解,当然有一些重要的组件,会随着整个系列的进行详细讲解到。按照计划,我们本应开始看Spark的存储系统结构,但是不着急,我们先花2~3篇文章的时间来重新认识一下我们的老朋友——RDD。它不仅与存储息息相关,也是Spark任务调度和计算的主要对象,现在打好基础是非常有益的。

RDD的正式名称为弹性分布式数据集(Resilient Distributed Dataset),Spark官方文档中对它的定义是:可以并行操作的、容错的元素集合。实际上,除了可并行操作、容错两点之外,RDD还具有一些其他相关的特点,如:

  • 不可变性(只能生成或转换,不能直接修改,容错时可以重算);
  • 分区性(内部数据会划分为Partition,是分布式并行的基础);
  • 名称中的“弹性”(可以灵活利用内存和外存,Spark设计思想的体现)。

RDD在Spark Core源码中的基础是o.a.s.rdd.RDD这个抽象类,本文就来对它做一些基础的了解。

RDD抽象类概述

构造方法与成员属性

代码#18.1 - o.a.s.rdd.RDD类的构造方法与成员属性

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging{
  if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {
    logWarning("Spark does not support nested RDDs (see SPARK-5063)")
  }

  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

  @transient val partitioner: Option[Partitioner] = None

  val id: Int = sc.newRddId()
  @transient var name: String = _

  private var storageLevel: StorageLevel = StorageLevel.NONE
  private var dependencies_ : Seq[Dependency[_]] = _
  @transient private var partitions_ : Array[Partition] = _

  @transient private[spark] val creationSite = sc.getCallSite()

  @transient private[spark] val scope: Option[RDDOperationScope] = {
    Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
  }

  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
  private val checkpointAllMarkedAncestors =
    Option(sc.getLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS)).exists(_.toBoolean)
  @transient private var doCheckpointCalled = false

  // ...
}

RDD类接收两个主构造方法参数:

  • _sc:即SparkContext实例,它不会被序列化。
  • deps:Dependency的序列,它也不会被序列化。所谓Dependency,就是指当前RDD对其他RDD的依赖关系,后面会讲到Dependency相关的知识。

在构造方法中会检查RDD是否被嵌套了,Spark不支持RDD嵌套,会打印警告信息。另外,还有一个辅助构造方法,它只接收一个RDD oneParent作为参数,此时会使用oneParent对应的SparkContext和一对一依赖OneToOneDependency来构造RDD。

RDD类的主要成员属性如下。

  • partitioner:键值型RDD(即RDD[(K,V)])的分区逻辑,是Partitioner的子类,后面也会讲到与Partitioner相关的细节。
  • id:该RDD的ID,可以调用SparkContext.newRddId()方法产生。
  • name:RDD的可读名称。
  • storageLevel:RDD的持久化等级,一共有12个等级。它由StorageLevel类及其伴生对象定义。
  • dependencies_:RDD的依赖,与构造参数deps相同,但是可以序列化,并且会考虑当前RDD是否被Checkpoint。
  • partitions_:包含RDD的所有分区的数组。
  • creationSite:创建这个RDD的调用代码位置,通过SparkContext.getCallSite()方法获得。关于CallSite的简介可以参见文章#3。
  • scope:RDD的操作域,由RDDOperationScope结构来描述。所谓操作域,其实就是一个确定的产生RDD的代码块,该代码块中的所有RDD就是在相同的操作域中。
  • checkpointData:保存的RDD检查点数据,方便出错时重算。
  • checkpointAllMarkedAncestors:布尔值,表示是否要对当前RDD的所有标记需要Checkpoint的父RDD保存检查点。
  • doCheckpointCalled:布尔值,表示是否已经保存过该RDD的检查点,防止重复保存。

需要RDD子类实现的方法

RDD类中醒目地标出了4个抽象方法,它们都很重要,RDD的子类必须要提供具体实现,如下所示。

代码#18.2 - o.a.s.rdd.RDD类中的抽象方法

  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

  protected def getPartitions: Array[Partition]

  protected def getDependencies: Seq[Dependency[_]] = deps

  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  • compute():计算RDD的一个分区split内的数据,返回对应数据类型的迭代器。
  • getPartitions():取得RDD所有分区的数组。
  • getDependencies():取得RDD的所有依赖,默认返回的就是deps。
  • getPreferredLocations():取得计算分区split的偏好位置(如HDFS上块的位置)数组,这个是可选的。

RDD类中对Partition、Dependency和Preferred Location都提供了简单的Getter方法,它们都会先检查当前RDD的检查点,然后调用上面的三个抽象方法,其代码如下所示。

代码#18.3 - o.a.s.rdd.RDD.partitions()/dependencies()/preferredLocations()方法

  final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
      if (dependencies_ == null) {
        dependencies_ = getDependencies
      }
      dependencies_
    }
  }

  final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
        partitions_.zipWithIndex.foreach { case (partition, index) =>
          require(partition.index == index,
            s"partitions($index).partition == ${partition.index}, but it should equal $index")
        }
      }
      partitions_
    }
  }

  final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
      getPreferredLocations(split)
    }
  }

RDD的五要素

通过以上的介绍,我们就可以归纳出RDD的五个组成要素了。这些内容在RDD类的ScalaDoc中其实已经有所体现:

  • 分区列表 [A list of partitions];
  • 计算每个分区的函数 [A function for computing each split];
  • 对其他RDD的依赖的列表 [A list of dependencies on other RDDs];
  • 可选的对键值型RDD的分区逻辑 [Optionally, a Partitioner for key-value RDDs];
  • 可选的计算分区的位置偏好列表 [Optionally, a list of preferred locations to compute each split on]。

RDD继承体系与算子概述

RDD的子类

RDD拥有众多的子类,这些子类都实现了上面的4个方法。大多数对RDD的操作方法(也就是算子)返回的结果都是RDD子类的实例。主要的RDD子类如下图所示,没有箭头,看官将就一下吧。

图#18.1 - RDD继承体系

由于我们之后还有很多事情要做,不可能将RDD的所有细节都分析一遍,这里暂时就不展开讲每个RDD子类的实现了。

我们已经知道,RDD的算子有两类,即转换(Transformation)算子与动作(Action)算子,这是老生常谈了。

转换算子

转换算子用于对一个RDD施加一系列逻辑,使之变成另一个RDD。在文章#0的WordCount程序中出现的flatMap()、map()、reduceByKey()都是转换算子。作为示例,我们来看看日常工作中极其常见、并且效率较高的mapPartitions()算子。

代码#18.4 - o.a.s.rdd.RDD.mapPartitions()方法

  def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

这个算子对RDD[T]每个分区的迭代器施加函数f的转换逻辑,返回一个MapPartitionsRDD[U],参数preservesPartitioning表示是否保留父RDD的分区。MapPartitionsRDD的具体实现如下。可以发现,getPartitions()和partitioner都直接复用了父RDD的,而compute()方法则是直接应用函数f的逻辑。

代码#18.5 - o.a.s.rdd.MapPartitionsRDD类

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {
  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

  override def clearDependencies() {
    super.clearDependencies()
    prev = null
  }

  // ...
}

再举个例子,coalesce()算子可以将一个RDD重新分区,也是常用的转换算子之一。它最终会产生CoalescedRDD,如果中途发生Shuffle的话,也有可能会产生ShuffledRDD.

动作算子

动作算子用于触发Job的提交,真正执行RDD转换逻辑的计算,并返回其处理结果。以代码#0.1中用到的collect()以及常用的foreach()为例。

代码#18.6 - o.a.s.rdd.RDD.collect()/foreach()方法

  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

代码很简单,需要注意,它们都调用了SparkContext.runJob()方法来提交一个Job。这个方法比较重要,待到之后研究Spark Core调度逻辑时,它可以称得上是一切的起点。

总结

本文通过阅读与RDD类相关的一些基础源码,复习了RDD的基本知识,另外又对RDD的子类与算子有了大致的了解。下一篇文章会专注于两个要点:Dependency与Partitioner,即RDD的依赖与分区逻辑。

— THE END —

本文分享自微信公众号 - 暴走大数据(zhouqiantanxi)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 50年前的登月程序和程序员有多硬核

    2019年7月20日,是有纪念意义的一天,这天不是因为广大网民帮周杰伦在新浪微博上的超话刷到第一,而是阿波罗登月的50周年的纪念日。早在几年前,在Github上...

    Linux阅码场
  • 爬虫学习开篇

    在这个大数据时代,尤其是人工浪潮兴起的时代,不论是工程领域还是研究领域,数据已经成为必不可少的一部分,而数据的获取很大程度上依赖于爬虫的爬取,所以爬虫也逐渐变得...

    stormwen
  • 用实战题目学习Python

    昨天在公众号发了第一个广告,是商家主动找的我,考虑到自己现在的粉丝比较少,我没有收取任何广告费。这篇关于Python的广告,大家还是结合自身实际再去买课,互联网...

    stormwen
  • Appium+python自动化(二十六)- 烟花一瞬,昙花一现 -Toast提示(超详解)

      今天宏哥在这里首先给小伙伴们和童鞋们分享一个有关昙花的小典故:话说昙花原是一位花神,她每天都开花,四季都灿烂。她还爱上了每天给她浇水除草的年轻人。后来,此事...

    北京-宏哥
  • 零基础入门微信小程序开发

    最近一个小游戏“跳一跳”火得不得了,相信即使您自己没有玩过,身边的亲戚朋友也一定玩过。画面大致如下:

    Java编程指南
  • 微信小程序与云开发

    Java、NodeJS、JavaScript、HTML5、CSS3、VueJs、ReactJs、前端工程化、前端架构

    达达前端
  • 比较好用的移动端适配的两种方案及flexible和px2rem-loader在webpack下的配置

    https://www.w3cplus.com/mobile/lib-flexible-for-html5-layout.html

    蓓蕾心晴
  • 一份真实的Python面试题

    自学Python已有一段时间了,就想着找份面试题来检验一下自己的学习情况,今天就和大家分享一份自己从网上找到的货真价实的Python面试题,每道题目看似简单,但...

    stormwen
  • Greenplum数据库使用总结(干货满满)--初级使用

    psql / clusterdb / createdb / dropdb / dropuser / gpbackup / gpcheck / gpcopy / ...

    小徐
  • SpringBoot入门建站全系列(四)Mybatis使用进阶篇:动态SQL与分页

    上一篇介绍了Mybatis的配置和基本用法《SpringBoot入门建站全系列(三)Mybatis操作数据库》

    品茗IT

扫码关注云+社区

领取腾讯云代金券