Spark2.x学习笔记:11、RDD依赖关系与stage划分

11、 RDD依赖关系与stage划分

Spark中RDD的高效与DAG图有着莫大的关系,在DAG调度中需要对计算过程划分stage,而划分依据就是RDD之间的依赖关系。

11.1 窄依赖与宽依赖

针对不同的转换函数,RDD之间的依赖关系分类窄依赖(narrow dependency)和宽依赖(wide dependency, 也称 shuffle dependency)。

(1)窄依赖 窄依赖是指1个父RDD分区对应1个子RDD的分区。换句话说,一个父RDD的分区对应于一个子RDD的分区,或者多个父RDD的分区对应于一个子RDD的分区。 所以窄依赖又可以分为两种情况,

  • 1个子RDD的分区对应于1个父RDD的分区,比如map、filter、union等算子
  • 1个子RDD的分区对应于N个父RDD的分区,比如co-paritioned join

(2)宽依赖 宽依赖是指1个父RDD分区对应多个子RDD分区。宽依赖又分为两种情况

  • 1个父RDD对应非全部多个子RDD分区,比如groupByKey、reduceByKey、sortByKey
  • 1个父RDD对应所以子RDD分区,比如未经协同划分的join

总结:如果父RDD分区对应1个子RDD的分区就是窄依赖,否则就是宽依赖。

11.2 为什么Spark将依赖分为窄依赖和宽依赖?

(1) 窄依赖(narrow dependencies)可以支持在同一个集群Executor上,以pipeline管道形式顺序执行多条命令,例如在执行了map后,紧接着执行filter。分区内的计算收敛,不需要依赖所有分区的数据,可以并行地在不同节点进行计算。所以它的失败恢复也更有效,因为它只需要重新计算丢失的parent partition即可

(2)宽依赖(shuffle dependencies) 则需要所有的父分区都是可用的,必须等RDD的parent partition数据全部ready之后才能开始计算,可能还需要调用类似MapReduce之类的操作进行跨节点传递。从失败恢复的角度看,shuffle dependencies 牵涉RDD各级的多个parent partition。

11.3 DAG

RDD之间的依赖关系就形成了DAG(有向无环图) 在Spark作业调度系统中,调度的前提是判断多个作业任务的依赖关系,这些作业任务之间可能存在因果的依赖关系,也就是说有些任务必须先获得执行,然后相关的依赖任务才能执行,但是任务之间显然不应出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG表示

11.4 stage划分

由于shuffle依赖必须等RDD的父RDD分区数据全部可读之后才能开始计算,因此spark的设计是让父 RDD将结果写在本地,完全写完之后,通知后面的RDD。后面的RDD则首先去读之前RDD的本地数据作为输入,然后进行运算。 由于上述特性,将shuffle依赖就必须分为两个阶段(stage)去做:

  • (1)第1个阶段(stage)需要把结果shuffle到本地,例如reduceByKey,首先要聚合某个key的所有记录,才能进行下一步的reduce计算,这个汇聚的过程就是shuffle。
  • (2)第2个阶段(stage)则读入数据进行处理。

为什么要写在本地? 后面的RDD多个分区都要去读这个信息,如果放到内存,如果出现数据丢失,后面的所有步骤全部不能进行,违背了之前所说的需要父RDD分区数据全部ready的原则。

同一个stage里面的task是可以并发执行的,下一个stage要等前一个stage ready(和mapreduce的reduce需要等map过程ready 一脉相承)。

Spark 将任务以 shuffle 依赖(宽依赖)为边界打散,划分多个 Stage. 最后的结果阶段叫做 ResultStage, 其它阶段叫 ShuffleMapStage, 从后往前推导,依将计算。

1.从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前RDD加入到该Stage 2.每个Stage里面Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的。 3.最后一个Stage里面的任务类型是ResultTask,前面其他所有的Stage的任务类型是ShuffleMapTask。 4.代表当前Stage的算子一定是该Stage的最后一个计算步骤

表面上看是数据在流动,实质上是算子在流动。 (1)数据不动代码动 (2)在一个Stage内部算子为何会流动(Pipeline)?首先是算子合并,也就是所谓的函数式编程的执行的时候最终进行函数的展开从而把一个Stage内部的多个算子合并成为一个大算子(其内部包含了当前Stage中所有算子对数据的计算逻辑);其次,是由于Transformation操作的Lazy特性!在具体算子交给集群的Executor计算之前首先会通过Spark Framework(DAGScheduler)进行算子的优化(基于数据本地性的Pipeline)。

11.5 Spark计算引擎原理

  • 通过RDD,创建DAG(逻辑计划)
  • 为DAG生成物理查询计划
  • 调度并执行Task
  • 分布式执行Task

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

3.3RDD的转换和DAG的生成

3.3 RDD的转换和DAG的生成 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG...

2937
来自专栏Spark生态圈

[spark] DAGScheduler划分stage源码解析

Spark Application只有遇到action操作时才会真正的提交任务并进行计算,DAGScheduler 会根据各个RDD之间的依赖关系形成一个DAG...

462
来自专栏斑斓

MongoDB的数据建模

MongoDB是一种面向Document的NoSQL数据库,如果我们还是按照RDB的方式来思考MongoDB的数据建模,则不能有效地利用MongoDB的优势;然...

3096
来自专栏CSDN技术头条

使用hadoop进行大规模数据的全局排序

1. Hellow hadoop~~! Hadoop(某人儿子的一只虚拟大象的名字)是一个复杂到极致,又简单到极致的东西。 说它复杂,是因为一个hadoop...

2335
来自专栏C/C++基础

CUDA Study Notes

SSE(Streaming SIMD Extensions,单指令多数据流扩展)指令集是Intel在Pentium III处理器中率先推出的。其中包含70条指令...

922
来自专栏大数据和云计算技术

DAG算法在hadoop中的应用

大学里面数据结构里面有专门的一章图论,可惜当年没有认真学习,现在不得不再次捡起来。真是少壮不努力,老大徒伤悲呀! 什么是DAG(Directed Acyclic...

4208
来自专栏吉浦迅科技

DAY56:阅读Dynamic Global Memory Allocation and Operations

Dynamic global memory allocation and operations are only supported by devices of...

913
来自专栏IT派

使用 Pandas 处理亿级数据

在数据分析领域,最热门的莫过于Python和R语言,此前有一篇文章《别老扯什么Hadoop了,你的数据根本不够大》指出:只有在超过5TB数据量的规模下,Hado...

1004
来自专栏月牙寂

[以太坊源代码分析] IV. 椭圆曲线密码学和以太坊中的椭圆曲线数字签名算法应用

本文转载来源自:http://blog.csdn.net/teaspring/article/details/77834360 感谢原作者teaspring...

4844
来自专栏Albert陈凯

Spark概要掌握情况自我核查

1、Spark目前只持哪哪种语言的API? Java, Scala, Python, R. Ref: http://spark.apache.org/ 2、R...

2373

扫码关注云+社区