深入浅出Spark(二):血统(DAG)

专题介绍 2009 年,Spark 诞生于加州大学伯克利分校的 AMP 实验室(the Algorithms, Machines and People lab),并于 2010 年开源。2013 年,Spark 捐献给阿帕奇软件基金会(Apache Software Foundation),并于 2014 年成为 Apache 顶级项目。 如今,十年光景已过,Spark 成为了大大小小企业与研究机构的常用工具之一,依旧深受不少开发人员的喜爱。如果你是初入江湖且希望了解、学习 Spark 的“小虾米”,那么 InfoQ 与 FreeWheel 技术专家吴磊合作的专题系列文章——《深入浅出 Spark:原理详解与开发实践》一定适合你! 本文系专题系列第二篇。

书接前文,在上一篇《内存计算的由来 —— RDD》,我们从“虚”、“实”两个方面介绍了RDD的基本构成。RDD通过dependencies和compute属性首尾相连构成的计算路径,专业术语称之为Lineage —— 血统,又名DAG(Directed Acyclic Graph,有向无环图)。一个概念为什么会有两个称呼呢?这两个不同的名字又有什么区别和联系?简单地说,血统与DAG是从两个不同的视角出发,来描述同一个事物。血统,侧重于从数据的角度描述不同RDD之间的依赖关系;DAG,则是从计算的角度描述不同RDD之间的转换逻辑。如果说RDD是Spark对于分布式数据模型的抽象,那么与之对应地,DAG就是Spark对于分布式计算模型的抽象。

顾名思义,DAG是一种“图”,图计算模型的应用由来已久,早在上个世纪就被应用于数据库系统(Graph databases)的实现中。任何一个图都包含两种基本元素:节点(Vertex)和边(Edge),节点通常用于表示实体,而边则代表实体间的关系。例如,在“倚天屠龙”社交网络的好友关系中,每个节点表示一个具体的人,每条边意味着两端的实体之间建立了好友关系。

倚天屠龙社交网络

在上面的社交网络中,好友关系是相互的,如张无忌和周芷若互为好友,因此该关系图中的边是没有指向性的;另外,细心的同学可能已经发现,上面的图结构是有“环”的,如张无忌、谢逊、白眉鹰王构成的关系环,张无忌、谢逊、紫衫龙王、小昭之间的关系环,等等。像上面这样的图结构,术语称之为“无向有环图”。没有比较就没有鉴别,有向无环图(DAG)自然是一种带有指向性、不存在“环”结构的图模型。各位看官还记得土豆工坊的例子吗?

土豆工坊DAG

在上面的土豆加工DAG中,每个节点是一个个RDD,每条边代表着不同RDD之间的父子关系 —— 父子关系自然是单向的,因此整张图是有指向性的。另外我们注意到,整个图中是不存在环结构的。像这样的土豆加工流水线可以说是最简单的有向无环图,每个节点的入度(Indegree,指向自己的边)与出度(Outdegree,从自己出发的边)都是1,整个图看下来只有一条分支。

不过,工业应用中的Spark DAG要比这复杂得多,往往是由不同RDD经过关联、拆分产生多个分支的有向无环图。为了说明这一点,我们还是拿土豆工坊来举例,在将“原味”薯片推向市场一段时间后,工坊老板发现季度销量直线下滑,老板心急如焚、一筹莫展。此时有人向他建议:“何不推出更多风味的薯片,来迎合大众的多样化选择”,于是老板一声令下,工人们对流水线做了如下改动。

土豆工坊高级生产线

与之前相比,新的流程增加了3条风味流水线,用于分发不同的调料粉。新流水线上的辣椒粉被分发到收集小号薯片的流水线、孜然粉分发到中号薯片流水线,相应地,番茄粉分发到大号薯片流水线。经过改造,土豆工坊现在可以生产3种风味、不同尺寸的薯片,即麻辣味的小号薯片、孜然味的中号薯片和番茄味的大号薯片。如果我们用flavoursRDD来抽象调味品的话,那么工坊新作业流程所对应的DAG会演化为如下所示带有2个分支的有向无环图。

多个分支的DAG

在上一篇,我们探讨了Spark Core内功心法的第一要义 —— RDD,这一篇,咱们来说说内功心法的第二个秘诀 —— DAG。

RDD算子 —— DAG的边

在上一篇《内存计算的由来 —— RDD》最后,我们以WordCount为例展示不同RDD之间转换而形成的DAG计算图。通读代码,从开发的角度来看,我们发现DAG构成的关键在于RDD算子调用。不同于Hadoop MapReduce,Spark以数据为导向提供了丰富的RDD算子,供开发者灵活地排列组合,从而实现多样化的数据处理逻辑。那么问题来了,Spark都提供哪些算子呢?

数据来源:https://spark.apache.org/docs/latest/rdd-programming-guide.html

从表格中我们看到,Spark的RDD算子丰富到让人眼花缭乱的程度,对于初次接触Spark的同学来说,如果不稍加归类,面对多如繁星的算子还真是无从下手。Apache Spark官网将RDD算子归为Transformations和Actions两种类型,这也是大家在各类Spark技术博客中常见的分类方法。为了说明Transformations和Actions算子的本质区别,我们必须得提一提Spark计算模型的“惰性计算”(Lazy evaluation,又名延迟计算)特性。

掌握一个新概念最有效的方法之一就是找到与之相对的概念 —— 与“惰性计算”相对,大多数传统编程语言、编程框架的求值策略是“及早求值”(Eager evaluation)。例如,对于我们熟悉的C、C++、Java来说,每一条指令都会尝试调度CPU、占用时钟周期、触发计算的执行,同时,CPU寄存器需要与内存通信从而完成数据交换、数据缓存。在传统编程模式中,每一条指令都很“急”(Eager),都恨不得自己马上被调度到“前线”、参与战斗。

惰性计算模型则不然 —— 具体到Spark,绝大多数RDD算子都很“稳”、特别能沉得住气,他们会明确告诉DAGScheduler:“老兄,你先往前走着,不用理我,我先绷会儿、抽袋烟。队伍的前排是我们带头大哥,没有他的命令,我们不会贸然行动。”有了惰性计算和及早求值的基本了解,我们再说回Transformations和Actions的区别。在Spark的RDD算子中,Transformations算子都属于惰性求值操作,仅参与DAG计算图的构建、指明计算逻辑,并不会被立即调度、执行。惰性求值的特点是当且仅当数据需要被物化(Materialized)时才会触发计算的执行,RDD的Actions算子提供各种数据物化操作,其主要职责在于触发整个DAG计算链条的执行。当且仅当Actions算子触发计算时, DAG从头至尾的所有算子(前面用于构建DAG的Transformations算子)才会按照依赖关系的先后顺序依次被调度、执行。

说到这里,各位看官不禁要问:Spark采用惰性求值的计算模型,有什么优势吗?或者反过来问:Spark为什么没有采用传统的及早求值?不知道各位看官有没有听说过“延迟满足效应”(又名“糖果效应”),它指的是为了获取长远的、更大的利益而自愿延缓甚至放弃目前的、较小的满足。正所谓:“云想衣裳花想容,猪想发福人想红”。Spark这孩子不仅天资过人,小小年纪竟颇具城府,独创的内功心法意不在赢得眼下的一招半式,而是着眼于整个武林。扯远了,我们收回来。笼统地说,惰性计算为Spark执行引擎的整体优化提供了广阔的空间。关于惰性计算具体如何帮助Spark做全局优化 —— 说书的一张嘴表不了两家事,后文书咱们慢慢展开。

还是说回RDD算子,除了常见的按照Transformations和Actions分类的方法,笔者又从适用范围和用途两个维度为老铁们做了归类,毕竟人类的大脑喜欢结构化的知识,官网上一字长蛇阵的罗列总是让人看了昏昏欲睡。有了这个表格,我们就知道 *ByKey 的操作一定是作用在Paired RDD上的,所谓Paired RDD是指Schema明确区分(Key, Value)对的RDD,与之相对,任意RDD指的是不带Schema或带任意Schema的RDD。从用途的角度来区分RDD算子的归类相对比较分散,篇幅的原因,这里就不一一展开介绍,老铁们各取所需吧。

值得一提的是,对于相同的计算场景,采用不同算子实现带来的执行性能可能会有天壤之别,在后续的性能调优篇咱们再具体问题具体分析。好吧,坑越挖越多,列位看官您稍安勿躁,咱们按照FIFO的原则,先来说说刚刚才提到的、还热乎的DAGScheduler。

DAGScheduler —— DAG的向导官

DAGScheduler是Spark分布式调度系统的重要组件之一,其他组件还包括TaskScheduler、MapOutputTracker、SchedulerBackend等。DAGScheduler的主要职责是根据RDD依赖关系将DAG划分为Stages,以Stage为粒度提交任务(TaskSet)并跟踪任务进展。如果把DAG看作是Spark作业的执行路径或“战略地形”,那么DAGScheduler就是这块地形的向导官,这个向导官负责从头至尾将地形摸清楚,根据地形特点排兵布阵。更形象地,回到土豆工坊的例子,DAGScheduler要做的事情是把抽象的土豆加工DAG转化为工坊流水线上一个个具体的薯片加工操作任务。那么问题来了,DAGScheduler以怎样的方式摸索“地形”?如何划分Stages?划分Stages的依据是什么?更进一步,将DAG划分为Stages的收益有哪些?Spark为什么要这么做?

DAGScheduler的核心职责

为了回答这些问题,我们需要先对于DAG的“首”和“尾”进行如下定义:在一个DAG中,没有父RDD的节点称为首节点,而没有子RDD的节点称为尾节点。还是以土豆工坊为例,其中首节点有两个,分别是potatosRDD和flavoursRDD,而尾节点是flavouredBakedChipsRDD。

DAG中首与尾的定义

DAGScheduler在尝试探索DAG“地形”时,是以首尾倒置的方式从后向前进行。具体说来,对于土豆工坊的DAG,DAGScheduler会从尾节点flavouredBakedChipsRDD开始,根据RDD依赖关系依次向前遍历所有父RDD节点,在遍历的过程中以Shuffle为边界划分Stage。Shuffle的字面意思是“洗牌”,没错,就是扑克游戏中的洗牌,在大数据领域Shuffle引申为“跨节点的数据分发”,指的是为了实现某些计算逻辑需要将数据在集群范围内的不同计算节点之间定向分发。在绝大多数场景中,Shuffle都是当之无愧的“性能瓶颈担当”,毫不客气地说,有Shuffle的地方,就有性能优化的空间。关于Spark Shuffle的原理和性能优化技巧,后面我们会单独开一篇来专门探讨。在土豆工坊的DAG中,有两个地方发生了Shuffle,一个是从bakedChipsRDD到flavouredBakedChipsRDD的计算,另一个是从flavoursRDD到flavouredBakedChipsRDD的计算,如下图所示。

土豆工坊DAG中的Shuffle

各位看官不禁要问:DAGScheduler如何判断RDD之间的转换是否会发生Shuffle呢?那位看官说了:“前文书说了半天算子是RDD之间转换的关键,莫不是根据算子来判断会不会发生Shuffle?”您还真猜错了,算子与Shuffle没有对应关系。就拿join算子来说,在大部分场景下,join都会引入Shuffle;然而在collocated join中,左右表数据分布一致的情况下,是不会发生Shuffle的。所以您看,DAGScheduler还真不能依赖算子本身来判断发生Shuffle与否。要回答这个问题,咱们还是得回到前文书《内存计算的由来 —— RDD》中介绍RDD时提到的5大属性。

属性名

成员类型

属性含义

dependencies

变量

生成该RDD所依赖的父RDD

compute

方法

生成该RDD的计算接口

partitions

变量

该RDD的所有数据分片实体

partitioner

方法

划分数据分片的规则

preferredLocations

变量

数据分片的物理位置偏好

RDD的5大属性及其含义

其中第一大属性dependencies又可以细分为NarrowDependency和ShuffleDependency,NarrowDependency又名“窄依赖”,它表示RDD所依赖的数据无需分发,基于当前现有的数据分片执行compute属性封装的函数即可;ShuffleDependency则不然,它表示RDD依赖的数据分片需要先在集群内分发,然后才能执行RDD的compute函数完成计算。因此,RDD之间的转换是否发生Shuffle,取决于子RDD的依赖类型,如果依赖类型为ShuffleDependency,那么DAGScheduler判定:二者的转换会引入Shuffle。在回溯DAG的过程中,一旦DAGScheduler发现RDD的依赖类型为ShuffleDependency,便依序执行如下3项操作:

  • 沿着Shuffle边界的子RDD方向创建新的Stage对象
  • 把新建的Stage注册到DAGScheduler的 stages 系列字典中,这些字典用于存储、记录与Stage有关的状态和元信息,以备后用
  • 沿着当前RDD的父RDD遵循广度优先搜索算法继续回溯DAG

拿土豆工坊来说,其尾节点flavouredBakedChipsRDD同时依赖bakedChipsRDD和flavoursRDD两个父RDD,且依赖类型都是ShuffleDependency,那么依据DAGScheduler的执行逻辑,此时会执行如下3项具体操作:

DAGScheduler回溯DAG过程当中遇到ShuffleDependency时的主要操作流程

DAGScheduler沿着尾节点回溯并划分出stage0

在完成第一个Stage(stage0)的创建和注册之后,DAGScheduler先沿着bakedChipsRDD方向继续向前回溯。在沿着这条路向前跑的时候,我们的这位DAGScheduler向导官惊喜地发现:“我去!这一路上一马平川、风景甚好,各个驿站之间什么障碍都没有,交通甚是顺畅,真是片好地形!”—— 沿路遇到的所有RDD(bakedChipsRDD,chipsRDD,cleanedPotatosRDD,potatosRDD)的依赖类型都是NarrowDependency。

在回溯完毕时,DAGScheduler同样会重复上述3个步骤,根据DAGScheduler以Shuffle为边界划分Stage的原则,沿途的所有RDD都划归为同一个Stage,暂且记为stage1。值得一提的是,Stage对象的rdd属性对应的数据类型是RDD[],而不是List[RDD[]]。对于一个逻辑上包含多个RDD的Stage来说,其rdd属性存储的是路径末尾的RDD节点,具体到我们的案例中就是bakedChipsRDD。

DAGScheduler沿着bakedChipsRDD方向回溯并划分出stage1

勤勤恳恳的DAGScheduler在成功创建stage1之后,依然不忘初心、牢记使命,继续奔向还未探索的路线。从上图中我们清楚地看到整块地形还剩下flavoursRDD方向的路径没有纳入DAGScheduler的视野范围。咱们的这位DAGScheduler向导官记性相当得好,早在划分stage0的时候,他就用小本子(栈)记下:“此路口有分叉,先沿着bakedChipsRDD方向走,然后再回过头来沿着flavoursRDD的方向探索。切记,切记!”此时,向导大人拿出之前的小本子,用横线把bakedChipsRDD方向的路径划掉 —— 表示该方向路径已探索过,然后沿着flavoursRDD方向大踏步地走下去。一脚下去,发现:“我去!到头儿了!”,然后紧接着执行一贯的“三招一套”流程 —— 创建Stage、注册Stage、继续回溯。随着DAGScheduler创建最后一个Stage:stage2,地形上的所有路径都已探索完毕。

DAGScheduler创建最后一个Stage:stage2

到此为止,我们的向导大人几乎跑断了腿、以首尾倒置的顺序对整片地形进行了地毯式搜查,最终将地形划分为3块战略区域(Stage)。那么问题来了,向导大人划分出的3块区域,有啥用呢?DAGScheduler他老人家马不停蹄地这么跑,到底图啥?前面我们提到,DAGScheduler的核心职责,是将抽象的DAG计算图转换为具体的、可并行计算的分布式任务。回溯DAG、创建Stage,只是这个核心职责的第一步,DAGScheduler以Stage(TaskSet)为粒度进行任务调度,伙同TaskScheduler、SchedulerBackend等一众大佬运筹帷幄、调兵遣将。不过,毕竟本篇的主题是DAG,到Spark调度系统的核心还有些距离,因此这里咱们暂且挖个坑,后面再单独开篇(Spark调度系统)专门讲述几位大佬之间的趣事逸闻。填坑之路漫漫其修远兮,吾将上下而挖坑。

咱们来回顾一下向导大人的心路历程,首先,DAGScheduler沿着DAG的尾节点一路北上,并沿途判断每一个RDD节点的dependencies属性。之后,如果判定RDD的dependencies属性是NarrowDependency,则DAGScheduler继续向前回溯;若RDD的依赖是ShuffleDependency,DAGScheduler便开启“三招一套”的招式,创建Stage、注册Stage并继续向前回溯。由此可见,何时切割DAG并生成新的Stage由RDD的依赖类型决定,当且仅当RDD的依赖是ShuffleDependency时,DAGScheduler才会新建Stage。

喜欢刨根问底的您一定会问:“DAGScheduler怎么知道RDD的依赖类型到底是哪一个?他怎么判别RDD的依赖是窄依赖还是ShuffleDependency?”要回答这个问题,我们就还得回到RDD的5大属性上,不过这次出场的是partitioner。还记得这个属性吗?partitioner是RDD的分区器、定义了RDD数据分片的分区规则,它决定了RDD的数据分片在分布式集群中如何分布,这个属性至关重要,后面介绍Shuffle的时候我们还会提到它。DAGScheduler正是通过partitioner来判定每个RDD的依赖类型,具体来说,如果子RDD的partitioner与父RDD的partitioner一致,那么DAGScheduler判定子RDD对父RDD的依赖属于窄依赖;相反,如果两者partitioner不一致,也即分区规则不同(分区规则不同则意味着一定存在数据的“重洗牌”,即Shuffle),那么DAGScheduler判定子对父的依赖关系是ShuffleDependency。到此,DAGScheduler对于DAG的划分逻辑可以暂且告一段落。原理说了,例子举了,还缺啥?对!代码。

Show me the code

古人云:“光说不练假把式”,我们用一个小例子来展示一下DAG与Stage的关系。还是用上篇《内存计算的由来 —— RDD》中的WordCount依样画葫芦,文件内容如下。

示例文件内容

代码也没变:

WordCount示例代码

虽然文件内容和代码都没变,但是我们观察问题的视角变了,这次我们关心的是DAG中Stage的划分以及Stage之间的关系。RDD的toDebugString函数让我们可以一览DAG的构成以及Stage的划分,如下图所示。

DAG构成及Stage划分

在上图中,从第3行往下,每一行表示一个RDD,很显然,第3行的ShuffledRDD是DAG的尾节点,而第7行的HadoopRDD是首节点。我们来观察每一行字符串打印的特点,首先最明显地,第4、5、6、7行的前面都有个制表符(Tab),与第3行有个明显的错位,这表示第3行的ShuffledRDD被划分到了一个Stage(记为stage0),而第4、5、6、7行的其他RDD被划分到了另外一个Stage(记为stage1),且stage0对stage1有依赖关系。假设第7行下面的RDD字符串打印有两个制表符,即与第7行产生错位,那么第7行下面的RDD则被划到了新的Stage,以此类推。

由此可见,通过RDD的toDebugString观察DAG的Stage划分时,制表符是个重要的指示牌。另外,我们看到第3、4行的开头都有个括号,括号里面是数字,这个数字标记的是RDD的partitions大小。当然了,观察RDD、DAG、Stage还有更直观的方式,Spark的Web UI提供了更加丰富的可视化信息,不过Spark的Web UI面板繁多,对于新同学来说一眼望去反而容易不知所措,也许后面时间允许的话我们单开一篇Spark Web UI的串讲。

Postscript

本篇是《Spark分布式计算科普专栏》的第二篇,笔者学浅才疏、疏漏难免。如果您有任何疑问,或是觉得文章中的描述有所遗漏或不妥,欢迎在评论区留言、讨论。掌握一门技术,书本中的知识往往只占两成,三成靠讨论,五成靠实践。更多的讨论能激发更多的观点、视角与洞察,也只有这样,对于一门技术的认知与理解才能更深入、牢固。

在本篇博文中,我们从DAG的边 —— Spark RDD算子入手,介绍了衔接RDD的两大类算子:Transformations和Actions,并对惰性计算有了初步的认知。然后,还是以土豆工坊为例,介绍DAGScheduler切割DAG、生成Stage的流程和步骤,尤其需要注意的是DAGScheduler以Shuffle为边界划分Stage。

最后,用上一篇的WordCount简单展示了DAG与Stage的关系。细心的读者可能早已发现,文中多次提及“后文书再展开”、“后面再单开一篇”,Spark是一个精妙而复杂的分布式计算引擎,在本篇博文中我们不得不对Spark中的许多概念都进行了“前置引用”。换句话说,有些概念还没来得及解释(如惰性计算、Shuffle、TaskScheduler、TaskSet、Spark调度系统),就已经被引入到了本篇博文中。这样的叙述方法也许会给您带来困惑,毕竟,用一个还未说清楚的概念,去解释另一个新概念,总是感觉没那么牢靠。

常言道:“杀人偿命、欠债还钱”,在后续的专栏文章中,我们会继续对Spark的核心概念与原理进行探讨,慢慢地把欠您的技术债还上,尽可能地还原Spark分布式内存计算引擎的全貌。毕竟Spark调度系统为何方神圣,DAGScheduler伙同TaskScheduler、SchedulerBackend、TaskSetManager等一众大佬如何演绎权利的游戏,且听下回分解。

作者简介

吴磊,Spark Summit China 2017 讲师、World AI Conference 2020 讲师,曾任职于 IBM、联想研究院、新浪微博,具备丰富的数据库、数据仓库、大数据开发与调优经验,主导基于海量数据的大规模机器学习框架的设计与实现。现担任 Comcast Freewheel 机器学习团队负责人,负责计算广告业务中机器学习应用的实践、落地与推广。热爱技术分享,热衷于从生活的视角解读技术,曾于《IBM developerWorks》和《程序员》杂志发表多篇技术文章。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/LBzKJPoaFAre5c0cI4ur
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券