Spark RDD的Transformation

RDD的Transformation是指由一个RDD生成新RDD的过程,比如前面使用的flatMapmapfilter操作都返回一个新的RDD对象,类型是MapPartitionsRDD,它是RDD的子类。

所有的RDD Transformation都只是生成了RDD之间的计算关系以及计算方法,并没有进行真正的计算。下面还是以WordCount为例进行介绍:

val textFile = sc.textFile("README.md")
val words = textFile.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) => a + b)
wordCounts.collect()

每一个操作都会生成一个新的RDD对象(其类型为RDD子类),它们按照依赖关系串在一起,像一个链表(其实是DAG的简化形式),每个对象有一个指向父节点的指针,以及如何从父节点通过计算生成新对象的信息。下图显示了WordCount计算过程中的RDD Transformation生成的RDD对象的依赖关系。

          RDD Transformation生成的RDD对象的依赖关系

除了RDD创建过程会生成新的RDD外,RDD Transformation也会生成新的RDD,并且设置与前一个RDD的依赖关系。结合每一个RDD的数据和它们之间的依赖关系,每个RDD都可以按依赖链追溯它的祖先,这些依赖链接就是RDD重建的基础。因此,理解了RDD依赖,也就理解了RDD的重建容错机制。

下面以map为例进行介绍。实际上,这就是生成了一个新的RDD对象,其类型是MapPartitionsRDD(它是RDD的子类):

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

MapPartitionsRDD的定义如下:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
    extends RDD[U](prev) {
    override val partitioner = if (preservesPartitioning)
        firstParent[T].partitioner else None
    override def getPartitions: Array[Partition] = firstParent[T].partitions
    override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
}

可以看到,MapPartitionsRDD最主要的工作是用变量f保存传入的计算函数,以便compute调用它来进行计算。其他4个重要属性基本保持不变:分区和优先计算位置没有重新定义,保持不变,依赖关系默认依赖调用的RDD,分区器优先使用上一级RDD的分区器,否则为None

在Spark中,RDD是有依赖关系的,这种依赖关系有两种类型。

  • 窄依赖。依赖上级RDD的部分分区。
  • Shuffle依赖。依赖上级RDD的所有分区。

对应类的关系如下图所示。

对应类的关系

之所以这么区分依赖关系,是因为它们之间有本质的区别。使用窄依赖时,可以精确知道依赖的上级RDD的分区。一般情况下,会选择与自己在同一节点的上级RDD分区,这样计算过程都在同一节点进行,没有网络IO开销,非常高效,常见的mapflatMapfilter操作都是这一类。而Shuffle依赖则不同,无法精确定位依赖的上级RDD的分区,相当于依赖所有分区(想想reduceByKey计算,需要对所有的key重新排列)。计算时涉及所有节点之间的数据传输,开销巨大。所以,以Shuffle依赖为分隔,Task被分成Stage,方便计算时的管理。

RDD仔细维护着这种依赖关系和计算方法,使得通过重新计算来恢复RDD成为可能。当然,这也不是万能的。如果依赖链条太长,那么通过计算来恢复的代价就太大了。所以,Spark又提供了一种叫检查点的机制。对于依赖链条太长的计算,对中间结果存一份快照,这样就不需要从头开始计算了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏行者悟空

Spark核心数据结构RDD的定义

1274
来自专栏Spark生态圈

[spark] RDD解析

每个具体的RDD都得实现compute 方法,该方法接受的参数之一是一个Partition 对象,目的是计算该分区中的数据。 我们通过map方法来看具体的实现...

651
来自专栏祝威廉

Spark Streaming 误用.transform(func)函数导致的问题解析

特定情况你会发现UI 的Storage标签上有很多新的Cache RDD,然后你以为是Cache RDD 不被释放,但是通过Spark Streaming 数据...

703
来自专栏Kubernetes

Kubernetes PodGC Controller源码分析

Author: xidianwangtao@gmail.com PodGC Controller配置 关于PodGC Controller的相关配置(ku...

33612
来自专栏https://www.cnblogs.com/L

【Spark篇】---SparkSql之UDF函数和UDAF函数

* 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx     * UDF1 传一个参数  UDF2传两个参数。。...

832
来自专栏CSDN技术头条

Spark之RDD详解

RDD 概念与特性 RDD是Spark最重要的抽象。spark统一建立在抽象的RDD之上。设计一个通用的编程抽象,使得spark可以应对各种场合的大数据情景。R...

2336
来自专栏肖力涛的专栏

Spark 踩坑记:从 RDD 看集群调度

本文的思路是从spark最细节的本质,即核心的数据结构RDD出发,到整个Spark集群宏观的调度过程做一个整理归纳,从微观到宏观两方面总结,方便自己在调优过程中...

8952
来自专栏祝威廉

Spark Streaming 数据清理机制

为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常。

1143
来自专栏大数据学习笔记

Spark2.x学习笔记:7、Spark应用程序设计

7、 Spark应用程序设计 7.1 基本流程 1.创建SparkContext对象 每个Spark应用程序有且仅有一个SparkContext对象,封装了...

2228
来自专栏Android相关

Java线程池---processWorkerExit方法解析

开始清理并且标记一个即将销毁的Worker。只有Worker所在的线程会被调用。除非,completedAbruptly被设置(为true)了,说明当前线程是意...

741

扫码关注云+社区