3.3RDD的转换和DAG的生成

3.3 RDD的转换和DAG的生成

Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。接下来以“Word Count”为例,详细描述这个DAG生成的实现过程。

Spark Scala版本的Word Count程序如下:

1:val file = spark.textFile("hdfs://...")

2:val counts = file.flatMap(line => line.split(" "))

3:        .map(word => (word, 1))

4:        .reduceByKey(_ + _)

5:counts.saveAsTextFile("hdfs://...")

file和counts都是RDD,其中file是从HDFS上读取文件并创建了RDD,而counts是在file的基础上通过flatMap、map和reduceByKey这三个RDD转换生成的。最后,counts调用了动作saveAsTextFile,用户的计算逻辑就从这里开始提交的集群进行计算。那么上面这5行代码的具体实现是什么呢?

1)行1:spark是org.apache.spark.SparkContext的实例,它是用户程序和Spark的交互接口。spark会负责连接到集群管理者,并根据用户设置或者系统默认设置来申请计算资源,完成RDD的创建等。

spark.textFile("hdfs://...")就完成了一个org.apache.spark.rdd.HadoopRDD的创建,并且完成了一次RDD的转换:通过map转换到一个org.apache.spark.rdd.MapPartitions-RDD。也就是说,file实际上是一个MapPartitionsRDD,它保存了文件的所有行的数据内容。

2)行2:将file中的所有行的内容,以空格分隔为单词的列表,然后将这个按照行构成的单词列表合并为一个列表。最后,以每个单词为元素的列表被保存到MapPartitionsRDD。

3)行3:将第2步生成的MapPartitionsRDD再次经过map将每个单词word转为(word,1)的元组。这些元组最终被放到一个MapPartitionsRDD中。

4)行4:首先会生成一个MapPartitionsRDD,起到map端combiner的作用;然后会生成一个ShuffledRDD,它从上一个RDD的输出读取数据,作为reducer的开始;最后,还会生成一个MapPartitionsRDD,起到reducer端reduce的作用。

5)行5:首先会生成一个MapPartitionsRDD,这个RDD会通过调用org.apache. spark.rdd.PairRDDFunctions#saveAsHadoopDataset向HDFS输出RDD的数据内容。最后,调用org.apache.spark.SparkContext#runJob向集群提交这个计算任务。

RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。

3.3.1 RDD的依赖关系

RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用,如图3-6所示。

[插图]

图3-6 RDD的窄依赖

2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition,如图3-7所示。

[插图]

图3-7 RDD的宽依赖

接下来可以从不同类型的转换来进一步理解RDD的窄依赖和宽依赖的区别,如图3-8所示。

对于map和filter形式的转换来说,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。对于union,只是将多个RDD合并成一个,parent RDD的Partition(s)不会有任何的变化,可以认为只是把parent RDD的Partition(s)简单进行复制与合并。对于join,如果每个Partition仅仅和已知的、特定的Partition进行join,那么这个依赖关系也是窄依赖。对于这种有规则的数据的join,并不会引入昂贵的Shuffle。对于窄依赖,由于RDD每个Partition依赖固定数量的parent RDD(s)的Partition(s),因此可以通过一个计算任务来处理这些Partition,并且这些Partition相互独立,这些计算任务也就可以并行执行了。

[插图]

图3-8 RDD的窄依赖和宽依赖

对于groupByKey,子RDD的所有Partition(s)会依赖于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的结果,因此这两个R D D是不能通过一个计算任务来完成的。同样,对于需要parent RDD的所有Partition进行join的转换,也是需要Shuffle,这类join的依赖就是宽依赖而不是前面提到的窄依赖了。

所有的依赖都要实现trait Dependency[T]:

abstract class Dependency[T] extends Serializable {

def rdd: RDD[T]

}

其中rdd就是依赖的parent RDD。

对于窄依赖的实现是:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

//返回子RDD的partitionId依赖的所有的parent RDD的Partition(s)

def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd

}

现在有两种窄依赖的具体实现,一种是一对一的依赖,即OneToOneDependency:

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int) = List(partitionId)

}

通过getParents的实现不难看出,RDD仅仅依赖于parent RDD相同ID的Partition。

还有一个是范围的依赖,即RangeDependency,它仅仅被org.apache.spark.rdd. UnionRDD使用。UnionRDD是把多个RDD合成一个RDD,这些RDD是被拼接而成,即每个parent RDD的Partition的相对顺序不会变,只不过每个parent RDD在UnionRDD中的Partition的起始位置不同。因此它的getPartents如下:

override def getParents(partitionId: Int) = {

if(partitionId >= outStart && partitionId < outStart + length) {

List(partitionId - outStart + inStart)

} else {

Nil

}

}

其中,inStart是parent RDD中Partition的起始位置,outStart是在UnionRDD中的起始位置,length就是parent RDD中Partition的数量。

宽依赖的实现只有一种:ShuffleDependency。子RDD依赖于parent RDD的所有Partition,因此需要Shuffle过程:

class ShuffleDependency[K, V, C](

@transient _rdd: RDD[_ <: Product2[K, V]],

val partitioner: Partitioner,

val serializer: Option[Serializer] = None,

val keyOrdering: Option[Ordering[K]] = None,

val aggregator: Option[Aggregator[K, V, C]] = None,

val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

//获取新的shuffleId

val shuffleId: Int = _rdd.context.newShuffleId()

//向ShuffleManager注册Shuffle的信息

val shuffleHandle: ShuffleHandle =

_rdd.context.env.shuffleManager.registerShuffle(

shuffleId, _rdd.partitions.size, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

宽依赖支持两种Shuffle Manager,即org.apache.spark.shuffle.hash.HashShuffleManager(基于Hash的Shuffle机制)和org.apache.spark.shuffle.sort.SortShuffleManager(基于排序的Shuffle机制)。

3.3.2 DAG的生成

原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD由哪些Parent RDD(s)转换而来和它依赖parent RDD(s)的哪些Partitions,是DAG的重要属性。借助这些依赖关系,DAG可以认为这些RDD之间形成了Lineage (血统)。借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。

那么Spark是如何根据DAG来生成计算任务呢?首先,根据依赖关系的不同将DAG划分为不同的阶段(Stage)。对于窄依赖,由于Partition依赖关系的确定性,Partition的转换处理就可以在同一个线程里完成,窄依赖被Spark划分到同一个执行阶段;对于宽依赖,由于Shuffle的存在,只能在parent RDD(s)Shuffle处理完成后,才能开始接下来的计算,因此宽依赖就是Spark划分Stage的依据,即Spark根据宽依赖将DAG划分为不同的Stage。在一个Stage内部,每个Partition都会被分配一个计算任务(Task),这些Task是可以并行执行的。Stage之间根据依赖关系变成了一个大粒度的DAG,这个DAG的执行顺序也是从前向后的。也就是说,Stage只有在它没有parent Stage或者parent Stage都已经执行完成后,才可以执行。这个过程可以查询第4章。

3.3.3 Word Count的RDD转换和DAG划分的逻辑视图

上文分析了在Word Count的RDD转换时,Spark生成了不同的RDD。这些RDD有的和用户逻辑直接显式对应,比如map操作会生成一个org.apache.spark.rdd.Map-PartitionsRDD;而有的RDD则是和Spark的实现原理相关,是Spark隐式生成的,比如org.apache.spark.rdd.ShuffledRDD,这个过程对于用户来说是透明的,用户只需要关心RDD的转换和动作即可。

RDD在创建子RDD的时候,会通过Dependency来定义它们之间的关系。通过Dependency,子RDD也可以获得它的parent RDD和parent RDD的Partition。

RDD转换的细节如图3-9所示。

[插图]

图3-9“Word Count”的RDD转换

通过图3-9,可以清晰地看到Spark对于用户提交的Application所做的处理。用户定义的RDD被系统显式和隐式地转换成多个RDD以及这些RDD之间的依赖,这些依赖构建了这些RDD的处理顺序及相互关系。关于这些RDD的转换时如何在计算节点上运行的,请参阅第4章。

为了对图3-9有更加直观的理解,图3-10以一个有五个分片的输入文件为例,详细描述了“Word Count”的逻辑执行过程。之所以称为逻辑执行过程,是因为具体的计算过程可能会有网络的交互,有频繁地将处理中间数据写入磁盘等过程。

[插图]

图3-10“Word Count”RDD的逻辑转换关系图

需要强调的一点是在转换操作reduceByKey时会触发一个Shuffle(洗牌)的过程。在Shuffle开始之前,有一个本地聚合的过程,比如第三个分片的(e,1)(e,1)聚合成了(e,2)。Shuffle的结果是为下游的Task生成了三个分片,这三个分片就构成了ShuffledRDD。之后在做了一个聚合之后,就生成了结果的RDD。关于Shuffle的具体实现过程,可以参阅第7章。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏星汉技术

原 荐 Spark框架核心概念

40580
来自专栏Albert陈凯

1.4 弹性分布式数据集

Spark大数据分析实战 1.4 弹性分布式数据集 本节将介绍弹性分布式数据集RDD。Spark是一个分布式计算框架,而RDD是其对分布式内存数据的抽象,可以...

37370
来自专栏一名叫大蕉的程序员

Spark你一定学得会(一)No.7

我是小蕉。 上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。 首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄...

21050
来自专栏个人分享

最最简单的~WordCount¬

步骤1:textFile先生成HadoopRDD,然后再通过map操作生成MappedRDD.

11510
来自专栏浪淘沙

SparkStreaming_Kafka_Redis整合

34530
来自专栏数据处理

提交任务到集群

16120
来自专栏行者悟空

Spark核心数据结构RDD的定义

38040
来自专栏Albert陈凯

Spark详解02Job 逻辑执行图Job 逻辑执行图

Job 逻辑执行图 General logical plan ? GeneralLogicalPlan.png 典型的 Job 逻辑执行图如上所示,经过下面四个...

389110
来自专栏鸿的学习笔记

spark的一些小总结

首先,DAG是MR的迭代模型。其中一个优点是,DAG可以做全局的优化,而Hadoop的MR没有意识到这点。

10920
来自专栏大数据

Zzreal的大数据笔记-SparkDay05

Spark Streaming SparkStreaming部分没做知识点的笔记,直接从代码上理解它的用法。后面整理Storm的时候会与SparkStreami...

23160

扫码关注云+社区

领取腾讯云代金券