Spark 的惰性运算

今天在检视项目代码的时候,无意中发现了下面一段代码:

class RddTransformer{ def doTransform(data: RDD[Data]): RDD[NewData]={ val newDataRdd = data.flatmap(DataTransformer.doTransform) if(DataTransformer.exceptionCount > 0) { logger.error(s"There are some illegal data, count: ${DataTransformer.exceptionCount}") } newDataRdd }}object DataTransformer{ var exceptionCount:Int = 0 def doTransform(data: Data): Option[NewData]={ if(data.isIllegal){ exceptionCount += 1 None }else{ // do something transform data to new data ..... Some(newData) } }}

作者的意图很简单,就是将RDD中的数据转换为新的数据格式,并统计非法数据的个数。咋一看代码,似乎没有什么问题,可是,这段代码真的能得到正确的结果么?答案是否定的,事实上,不管RDD中包含多少非法数据,if(DataTransformer.exceptionCount > 0)这个条件永远都不会为真。为什么?你现在肯定充满了疑惑,让我们先来看看 Spark 的文档上对 RDD 操作的解释:

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. (RDD Operations)

在 Spark 中,所有的 transformation() 类型操作都是延迟计算的,Spark 只是记录了将要对数据集进行的操作。只有需要数据集将数据返回到 Driver 程序时(即触发 Action 类型操作),所有已记录的 transformation() 才会执行。

回到上面的代码,由于针对RDD[Data]flatmap操作属于 transformation() 类型操作,所以val newDataRdd = data.flatmap(DataTransformer.doTransform)这段代码只是记录了一下对 RDD 的操作,并没有真正的去执行DataTransformer.doTransform方法中的代码。我们可以尝试在 Spark Shell 中实验一下:

scala> var counter = 0counter: Int = 0scala> var rdd = sc.parallelize(Seq(1,2,3,4,5,6)).map(x => counter += x)rdd: spark.RDD[Int] = spark.MappedRDD@2ee9b6e3scala> countercounter: Int = 0

显然累加操作并没有被执行,根据 Shell 终端的输出,Spark 似乎只是记录了一下我们的操作,并返回了一个新的 RDD。当对 RDD 进行 transformation() 操作的时候,在 Spark 内部究竟发生了什么?在解释这个问题之前,先来看看 Spark 作业的执行逻辑。

Spark Job 执行逻辑

典型的 Spark Job 逻辑执行图如下所示,Spark Job 经过下面四个步骤可以得到最终执行结果:

  • 从数据源(可以是本地 file,内存数据结构, HDFS,HBase 等)读取数据创建最初的 RDD。上一段代码中的 parallelize() 相当于 createRDD()。
  • 对 RDD 进行一系列的 transformation() 操作,每一个 transformation() 会产生一个或多个包含不同类型 T 的 RDD[T]。T 可以是 Scala 里面的基本类型或数据结构,不限于 (K, V)。但如果是 (K, V),K 不能是 Array 等复杂类型(因为难以在复杂类型上定义 partition 函数)。
  • 对最后的 final RDD 进行 action() 操作,每个 partition 计算后产生结果 result。
  • 将 result 回送到 driver 端,进行最后的 f(list[result]) 计算。例子中的 count() 实际包含了action() 和 sum() 两步计算。

Spark 在每次 transformation() 的时候使用了新产生的 RDD 来记录计算逻辑,这样就把作用在 RDD 上的所有计算逻辑串起来形成了一个链条,逻辑执行图上表示的实际上就是是 Spark Job 的计算链。当然某些 transformation() 比较复杂,会包含多个子 transformation(),因而会生成多个 RDD。这就是实际 RDD 个数会比我们想象的多一些的原因。当对 RDD 进行 action() 时,Spark 会调用在计算链条末端最后一个 RDD 的compute()方法,这个方法会接收它上一个 RDD 或者数据源的 input records,并执行自身定义的计算逻辑,从而输出结果。一句话总结 Spark 执行 action() 的流程就是:从计算链的最后一个 RDD 开始,依次从上一个 RDD 获取数据并执行计算逻辑,最后输出结果。

数据计算过程

下面的代码段,展现了RDD.flatmap()MapPartitionsRDD的实现,在代码中,我们看到,当调用RDDmap并传入一个函数f的时候,Spark 并没有做什么运算,而是用f作为一个入参创建了一个叫MapPartitionsRDD的对象并返回给调用者。而在MapPartitionsRDD.scala中,我们也看到只有当compute方法被调用的时候,我们之前传入的函数f才会真正的被执行

// RDD.scala ... /** * Return a new RDD by applying a function to all elements of this RDD. */ def flatmap[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } // MapPartitionsRDD.scala private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)) override def clearDependencies() { super.clearDependencies() prev = null }}

实际计算过程大概是这样的:

  1. 根据动作操作来将一个应用程序划分成多个作业。
  2. 一个作业经历 DAG 调度和任务调度之后,被划分成一个一个的任务,对应 Task 类。
  3. 任务被分配到不同核心去执行,执行 Task.run。
  4. Task.run 会调用阶段末 RDD 的 iterator 方法,获取该 RDD 某个分区内的数据记录,而 iterator 方法有可能会调用 RDD 类的 compute 方法来负责父 RDD 与子 RDD 之间的计算逻辑。

整个过程会比较复杂,在此不进行展开,我们只需要知道 Apache Spark 最终会调用 RDD 的 iterator 和 compute 方法来计算分区数据即可。

compute 方法

在 RDD 中,compute()被定义为抽象方法,要求其所有子类都必须实现,该方法接受的参数之一是一个Partition对象,目的是计算该分区中的数据。以之前flatmap操作生成得到的MapPartitionsRDD类为例。

override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))

其中,firstParent在 RDD 中定义。

/** Returns the first parent RDD */protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]]}

MapPartitionsRDD类的compute方法调用当前 RDD 内的第一个父 RDD 的iterator方法,该方的目的是拉取父 RDD 对应分区内的数据,它返回一个迭代器对象,迭代器内部存储的每个元素即父 RDD 对应分区内已经计算完毕的数据记录。得到的迭代器作为f方法的一个参数。compute方法会将迭代器中的记录一一输入f方法,得到的新迭代器即为所求分区中的数据。

iterator方法

iterator方法的实现在 RDD 类中。

/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) }}

iterator方法首先检查当前 RDD 的存储级别,如果存储级别不为None,说明分区的数据要么已经存储在文件系统当中,要么当前 RDD 曾经执行过cachepersise等持久化操作,因此需要想办法把数据从存储介质中提取出来。iterator方法继续调用CacheManagergetOrCompute方法。

/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T]( rdd: RDD[T], partition: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(rdd.id, partition.index) blockManager.get(key) match { case Some(blockResult) => // Partition is already materialized, so just return its values context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => // 省略部分源码 val computedValues = rdd.computeOrReadCheckpoint(partition, context) val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) new InterruptibleIterator(context, cachedValues) } // 省略部分源码}

getOrCompute方法会根据 RDD 编号与分区编号计算得到当前分区在存储层对应的块编号,通过存储层提供的数据读取接口提取出块的数据。这时候会有两种可能情况发生:

  • 数据之前已经存储在存储介质当中,可能是数据本身就在存储介质(如读取 HDFS 中的文件创建得到的 RDD)当中,也可能是 RDD 经过持久化操作并经历了一次计算过程。这时候就能成功提取得到数据并将其返回。
  • 数据不在存储介质当中,可能是数据已经丢失,或者 RDD 经过持久化操作,但是是当前分区数据是第一次被计算,因此会出现拉取得到数据为 None 的情况。这就意味着我们需要计算分区数据,继续调用 RDD 类 computeOrReadCheckpoint 方法来计算数据,并将计算得到的数据缓存到存储介质中,下次就无需再重复计算。
  • 如果当前RDD的存储级别为 None,说明为未经持久化的 RDD,需要重新计算 RDD 内的数据,这时候调用 RDD 类的 computeOrReadCheckpoint 方法,该方法也在持久化 RDD 的分区获取数据失败时被调用。

/** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)}

computeOrReadCheckpoint方法会检查当前 RDD 是否已经被标记成检查点,如果未被标记成检查点,则执行自身的compute方法来计算分区数据,否则就直接拉取父 RDD 分区内的数据。

如何正确的获取计算结果

说了那么多理论,我们回到问题本身,怎么才是获取运算结果的正确方法?你也许会说,既然 transformation() 操作是惰性的,那么在之后马上触发一个 action() 操作就 OK 了。但这也是不正确的,这就涉及到了 Spark 的另外一个重要概念:分布式,在这里就不展开讲了,有兴趣可以参考官方文档:Understanding closures

下面是一个正确的实现:

class RddTransformer{ def doTransform(data: RDD[Data]): RDD[NewData]={ val newDataRdd = data.flatmap(DataTransformer.doTransform).cache() val exceptionCount = newDataRdd.filter(_.isEmpty).count() if(exceptionCount > 0) { logger.error(s"There are some illegal data, count: ${DataTransformer.exceptionCount}") } newDataRdd }}object DataTransformer{ def doTransform(data: Data): Option[NewData]={ if(data.isIllegal){ None }else{ // do something transform data to new data ..... Some(newData) } }}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏LhWorld哥陪你聊算法

【Spark篇】---Spark中Action算子

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action...

1212
来自专栏跟着阿笨一起玩NET

.Net 2.0中使用扩展方法

1272
来自专栏ml

Redis学习笔记二

  学习Redis添加Object时,由于Redis只能存取字符串String,对于其它数据类型形容:Int,long,double,Date等不提供支持,因而...

3349
来自专栏个人分享

SparkContext源码阅读

SparkContext是spark的入口,通过它来连接集群、创建RDD、广播变量等等。

1972
来自专栏Jed的技术阶梯

Spark常用Transformations算子(二)

介绍以下Transformations算子: aggregateByKey join cogroup cartesian pipe repartit...

1144
来自专栏LhWorld哥陪你聊算法

【Spark篇】---Spark中transformations算子二

coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。

811
来自专栏人工智能LeadAI

Spark常用的算子以及Scala函数总结

上海站 | 高性能计算之GPU CUDA培训 4月13-15日 ? 三天密集式学习 快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28...

56812
来自专栏编程

MapReduce编程模型

通过WordCount程序理解MapReduce编程模型 WordCount,名为单词统计,功能是统计文本文件中每个单词出现的次数。例如下图中,有两个文本(蓝色...

2148
来自专栏浪淘沙

Spark实现排序

question: 用spark对数据进行排序,首先按照颜值的从高到低进行排序,如果颜值相等,在根据年龄的升序排序

801
来自专栏行者悟空

利用Spark RDD实现分组并排序

4983

扫码关注云+社区

领取腾讯云代金券