Spark设计理念和基本架构

Spark是一个通用的并行计算框架,由加州伯克利大学(UC Berkeley) 的AMP实验室开发于2009年,并于2010年开源,2013年成长为Apache旗下在大数据领域最活跃的开源项目之一。

虽然Spark是一个通用的并行计算框架,但是Spark本质上也是一个基于map-reduce算法模型实现的分布式计算框架,Spark不仅拥有了Hadoop MapReduce的能力和优点,还解决了Hadoop MapReduce中的诸多性能缺陷。

HadoopMapReduce的问题与演进

早期的Hadoop MapReduce采用的是MRv1版本的MapReduce编程模型。MRv1具体实现可以参考org.apache.hadoop.mapred包。MRv1的Map和Reduce都是通过接口实现的。MRv1主要包括以下三个部分:

1)运行时环境(JobTracker和TaskTracker)

2)编程模型(MapReduce)

3)数据处理引擎(Map任务和Reduce任务)

MRv1将集群管理功能和数据处理能力紧耦合在一起,如下图所示:

这种紧耦合的设计会导致以下问题:

1)可扩展性差:在运行时,JobTracker既负责资源管理,又负责任务调度,当集群繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题。

2)可用性差:采用了单节点的Master,没有备用Master及选举操作,这导致一旦Master出现故障,整个集群将不可用。

3)资源利用率低:TaskTracker使用slot等量划分本节点上的资源量。slot代表计算资源(CPU、内存等)。任务需要获取到slot后才能运行,Hadoop调度器负责将各个TaskTracker上的空闲slot分配给Task使用。即使一些Task不能充分利用slot所代表的资源,其他Task也无法使用这些空闲的资源。在MRv1中,slot分为Map slot和Reduce slot两种,分别供MapTask和Reduce Task使用。有时会出现因为作业刚刚启动等原因导致MapTask很多,而Reduce Task任务还没有调度的情况,这时Reduce slot就会被闲置。

4)无法支持多种MapReduce框架:无法通过可插拔方式将自身的MapReduce框架替换为其他实现,如Spark、Storm等。

Apache社区为了解决上述问题,对Hadoop MRv1进行改造,将集群管理和数据处理进行解耦,演进出MRv2。在MRv2中,MRv1所包含的两大功能-集群管理和数据处理被解耦。负责集群管理的JobTracker和TaskTracker被重构为通用的资源管理器(资源调度平台)ResourceManager(RM)、节点管理器NodeManager(NM)和负责各个计算框架的任务调度模型ApplicationMaster(AM)。在MRv2中,资源调度采用两级调度方案,ResourceManager负责整个集群的资源管理,并将NodeManager汇报的空闲资源封装成container提供给ApplicationMaster完成第一级调度。而负责计算框架任务调度的ApplicationMaster则根据实际应用的具体情况进行第二级资源调度。二级调度的设计大大减少了ResourceManager的压力。NodeManager负责对单个节点的资源管理,并将资源信息、Container运行状态、健康状况等信息上报给ResourceManager。ResourceManager为了保证Container的利用率,会监控Container,如果Container未在有限的时间内使用,ResourceManager将命令NodeManager“杀死”Container,以便将资源分配给其他任务。经过将集群资源管理和数据处理解耦后,MRv2的核心不再是MapReduce框架,而是YARN集群管理器。因为在以YARN为核心的MRv2中,MapReduce框架是可插拔的,完全可以替换为其他MapReduce实现,比如Spark、Storm等。MRv2的示意图如下所示:

虽然Hadoop MRv2解决了MRv1中的一些问题,但是由于对HDFS的频繁操作(包括计算结果持久化、数据备份、资源下载及shuffle等),导致磁盘I/O成为系统性能的瓶颈,因此只适用于离线数据处理或批处理,而不能支持对迭代式、流式(实时式)数据的处理。

Spark对Hadoop的优化与改进

Spark的作者看到了MRv1的问题,并对MapReduce做了大量的改进和优化,主要包括以下5个方面:

1)减少磁盘I/O:

  1. 中间结果缓存在内存中:随着实时大数据应用越来越多,Hadoop作为离线的高吞吐、低响应框架已不能满足这类需求。Hadoop MapReduce的map端将中间输出和结果存储在磁盘中,reduce端又需要从磁盘读写中间结果,从而造成磁盘I/O成为瓶颈。Spark则允许将map端的中间输出和结果缓存在内存中,从而使得reduce端在拉取中间结果时避免了大量的磁盘I/O。
  2. 应用程序上传的资源文件缓存在Driver本地文件服务的内存中:Hadoop YARN中的ApplicationMaster申请到Container后,具体任务需要利用NodeManager从HDFS的不同节点下载任务所需的资源(如Jar包),增加了磁盘I/O。Spark则将应用程序上传的资源文件缓存在Driver本地文件服务的内存中,当Executor执行任务时直接从Driver的内存中读取,从而节省了大量的磁盘I/O。

2)增加任务并行度:由于将中间结果写到磁盘与从磁盘读取中间结果属于不同的环节,Hadoop将它们简单地通过串行执行衔接起来。而Spark则把不同的环节抽象为Stage,允许多个Stage既可以串行执行,又可以并行执行。

3)避免重新计算:当Stage中某个分区的Task执行失败后,会重新对此Stage调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。

4)可选的Shuffle排序:Hadoop MapReduce在Shuffle之前会将中间结果按key的hash值和key值大小进行两层排序,确保分区内部的有序性。而Spark则可以根据不同场景选择在map端排序还是reduce端排序。

5)灵活的内存管理策略:Spark将内存分为堆上的存储内存、堆外的存储内存、堆上的执行内存、堆外的执行内存4个部分。Spark既提供了执行内存和存储内存之间固定边界的实现,又提供了执行内存和存储内存之间“软”边界的实现。Spark默认使用“软”边界的实现,执行内存或存储内存中的任意一方在资源不足时都可以借用另一方的内存,最大限度地提高资源的利用率,减少对资源的浪费。Spark由于对内存使用的偏好,内存资源的多寡和使用率就显得尤为重要,为此Spark的内存管理器提供的Tungsten实现了一种与操作系统的内存Page非常相似的数据结构,用于直接操作操作系统内存,节省了创建的Java对象在堆中占用的内存,使得Spark对内存的使用效率更加接近硬件。Spark会给每个Task分配一个配套的任务内存管理器,对Task粒度的内存进行管理。Task的内存可以被多个内部的消费者消费,任务内存管理器对每个消费者进行Task内存的分配与管理,因此Spark对内存有着更细粒度的管理。

除了上述的改进外,Spark还具有以下特点:

1)检查点支持:Spark的RDD之间维护了血缘关系(lineage),一旦某个RDD失败了,则可以由父RDD重建。虽然lineage可用于错误后RDD的恢复,但对于很长的lineage来说,恢复过程非常耗时。如果应用启用了检查点,那么在Stage中的Task都执行成功后,SparkContext将把RDD计算的结果保存到检查点,这样当某个RDD执行失败后,再由父RDD重建时就不需要重新计算,而直接从检查点恢复数据。

2)易于使用。Spark现在支持Java、Scala、Python和R等语言编写应用程序,大大降低了使用者的门槛。除此之外,还自带了80多个高等级操作符,允许在Scala、Python、R的shell中进行交互式查询。

3)支持交互式:Spark使用Scala开发,并借助于Scala类库中的Iloop实现交互式shell,提供对REPL(Read-eval-print-loop)的实现。

4)支持SQL查询。在数据查询方面,Spark支持SQL及Hive SQL,这极大地方便了传统SQL开发和数据仓库的使用者。

5)支持流式计算:与MapReduce只能处理离线数据相比,Spark还支持实时的流计算。Spark依赖Spark Streaming对数据进行实时的处理,其流式处理能力还要强于Storm。

6)高可用:Spark自身实现了Standalone部署模式,此模式下的Master可以有多个,解决了单点故障问题。Spark也完全支持使用外部的部署模式,比如YARN、Mesos、EC2等。

7)丰富的数据源支持:Spark除了可以访问操作系统自身的文件系统和HDFS之外,还可以访问Kafka、Socket、Cassandra、HBase、Hive、Alluxio(Tachyon)及任何Hadoop的数据源。

8)丰富的文件格式支持:Spark支持文本文件格式、CSV文件格式、JSON文件格式、ORC文件格式、Parquet文件格式、Libsvm文件格式,有利于Spark与其他数据处理平台的对接。

Spark基础概念

RDD(resillient distributed dataset):弹性分布式数据集。Spark应用程序通过使用Spark的转换API,可以将RDD封装为一系列具有血缘关系的RDD,也就是DAG。只有通过Spark的动作API才会将RDD及其DAG提交到DAGScheduler。RDD的祖先一定是一个跟数据源相关的RDD,负责从数据源迭代读取数据。

DAG(Directed Acycle graph):有向无环图。在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。Spark使用DAG来反映各RDD之间的依赖或血缘关系。 Partition:数据分区,即一个RDD的数据可以划分为多少个分区。Spark根据Partition的数量来确定Task的数量。

NarrowDependency:窄依赖,即子RDD依赖于父RDD中固定的Partition。Narrow-Dependency分为OneToOneDependency和RangeDependency两种。

ShuffleDependency:Shuffle依赖,也称为宽依赖,即子RDD对父RDD中的所有Partition都可能产生依赖。子RDD对父RDD各个Partition的依赖将取决于分区计算器(Partitioner)的算法。

Job:用户提交的作业。当RDD及其DAG被提交给DAGScheduler调度后,DAGScheduler会将所有RDD中的转换及动作视为一个Job。一个Job由一到多个Task组成。

Stage:Job的执行阶段。DAGScheduler按照ShuffleDependency作为Stage的划分节点对RDD的DAG进行Stage划分(上游的Stage将为ShuffleMapStage)。因此一个Job可能被划分为一到多个Stage。Stage分为ShuffleMapStage和ResultStage两种。

Task:具体执行任务。一个Job在每个Stage内都会按照RDD的Partition数量,创建多个Task。Task分为ShuffleMapTask和ResultTask两种。ShuffleMapStage中的Task为ShuffleMapTask,而ResultStage中的Task为ResultTask。ShuffleMapTask和ResultTask类似于Hadoop中的Map任务和Reduce任务。

Shuffle:Shuffle是所有MapReduce计算框架的核心执行阶段,Shuffle用于打通map任务(在Spark中就是ShuffleMapTask)的输出与reduce任务(在Spark中就是ResultTask)的输入,map任务的中间输出结果按照指定的分区策略(例如,按照key值哈希)分配给处理某一个分区的reduce任务。

Spark基本组成与架构

Apache Spark由SparkCore、Spark SQL、Spark Streaming、GraphX、MLlib等模块组成。模块间的整体关系如下图所示:

其中Spark Core是Apache Spark的核心,是其他扩展模块的基础运行时环境。下面我们简要描述SparkCore的功能和其他扩展模块的功能。

  • Spark Core,主要提供Spark应用的运行时环境,包括以下功能:
    • 基础设施:
      • SparkConf:用于管理Spark应用程序的各种配置信息;
      • 内置的基于Netty的RPC框架,包括同步和异步的多种实现。RCP框架时Spark各组件间通信的基础;
      • 事件总线: SparkContext内部各组件间使用事件—监听器模式异步调用的实现;
      • 度量系统:由Spark中的多种度量源(Source)和多种度量输出(Sink)构成,完成对整个Spark集群中各组件运行期状态的监控;
    • SparkContext:通常而言,用户开发的Spark应用程序的提交与执行都离不开SparkContex的支持。在正式提交应用程序之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储体系、计算引擎、度量系统、文件服务、Web UI等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。
    • SparkEnv:Spark执行环境SparkEnv是Spark中的Task运行所必需的组件。SparkEnv内部封装了RPC环境(RpcEnv)、序列化管理器、广播管理器(BroadcastManager)、map任务输出跟踪器(MapOutputTracker)、存储体系、度量系统(MetricsSystem)、输出提交协调器(OutputCommitCoordinator)等Task运行所需的各种组件。
    • 存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘I/O,提升了任务执行的效率,使得Spark适用于实时计算、迭代计算、流式计算等场景。在实际场景中,有些Task是存储密集型的,有些则是计算密集型的,所以有时候会造成存储空间很空闲,而计算空间的资源又很紧张。Spark的内存存储空间与执行存储空间之间的边界可以是“软”边界,因此资源紧张的一方可以借用另一方的空间,这既可以有效利用资源,又可以提高Task的执行效率。此外,Spark的内存空间还提供了Tungsten的实现,直接操作操作系统的内存。由于Tungsten省去了在堆内分配Java对象,因此能更加有效地利用系统的内存资源,并且因为直接操作系统内存,空间的分配和释放也更迅速。
    • 调度系统:调度系统主要由DAGSchedulerTaskScheduler组成,它们都内置在SparkContext中。DAGScheduler负责创建Job、将DAG中的RDD划分到不同的Stage、给Stage创建对应的Task、批量提交Task等功能。TaskScheduler负责按照FIFO或者FAIR等调度算法对批量Task进行调度;为Task分配资源;将Task发送到集群管理器的当前应用的Executor上,由Executor负责执行等工作。即使现在Spark增加了SparkSession和DataFrame等新的API,但这些新API的底层实际依然依赖于SparkContext。
    • 计算引擎:计算引擎由内存管理器(MemoryManager)Tungsten任务内存管理器(TaskMemory-Manager)Task外部排序器(ExternalSorter)Shuffle管理器(ShuffleManager)等组成。
      • MemoryManager除了对存储体系中的存储内存提供支持和管理外,还为计算引擎中的执行内存提供支持和管理。
      • Tungsten除用于存储外,也可以用于计算或执行。
      • TaskMemoryManager对分配给单个Task的内存资源进行更细粒度的管理和控制。
      • ExternalSorter用于在map端或reduce端对ShuffleMapTask计算得到的中间结果进行排序、聚合等操作。
      • ShuffleManager用于将各个分区对应的ShuffleMapTask产生的中间结果持久化到磁盘,并在reduce端按照分区远程拉取ShuffleMapTask产生的中间结果。
  • Spark SQL: 由于SQL具有普及率高、学习成本低等特点,为了扩大Spark的应用面,还增加了对SQL及Hive的支持。Spark SQL的过程可以总结为:首先使用SQL语句解析器(SqlParser)将SQL转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行的过程。其中,规则包括语法分析器(Analyzer)和优化器(Optimizer)。Hive的执行过程与SQL类似。
  • Spark Streaming: Spark Streaming与Apache Storm类似,也用于流式计算。Spark Streaming支持Kafka、Flume、Kinesis和简单的TCP套接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的接口规范。Dstream是Spark Streaming中所有数据流的抽象,Dstream可以被组织为DStream Graph。Dstream本质上由一系列连续的RDD组成。
  • GraphX: Spark提供的分布式图计算框架。GraphX主要遵循整体同步并行计算模式(Bulk SynchronousParallell,BSP)下的Pregel模型实现。GraphX提供了对图Graph的抽象,Graph由顶点(Vertex)、边(Edge)及继承了Edge的EdgeTriplet(添加了srcAttr和dstAttr,用来保存源顶点和目的顶点的属性)三种结构组成。GraphX目前已经封装了最短路径、网页排名、连接组件、三角关系统计等算法的实现,用户可以选择使用。
  • MLlib: Spark提供的机器学习框架。机器学习是一门涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多领域的交叉学科。MLlib目前已经提供了基础统计、分类、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概率论、数据挖掘方面的数学算法。

Spark编程模型

Spark应用程序从编写到提交、执行、输出的整个过程如下图所示:

步骤如下:

1)用户使用SparkContext提供的API编写Driver应用程序。有时我们会使用SparkSession、DataFrame、SQLContext、HiveContext以及StreamingContext等提供的API编写Driver应用程序,其实SparkSession、DataFrame、SQLContext、HiveContext以及StreamingContext都对SparkContext进行了封装,并提供了DataFrame、SQL、Hive以及流式计算相关的API。

2)使用SparkContext提交的用户应用程序:

  1. 首先会通过RpcEnv向集群管理器(Cluster Manager)注册应用(Application)并且告知集群管理器需要的资源数量。
  2. 集群管理器根据Application的需求,给Application分配Executor资源,并在Worker上启动CoarseGrainedExecutorBackend进程(该进程内部将创建Executor)。
  3. Executor所在的CoarseGrainedExecutorBackend进程在启动的过程中将通过RpcEnv直接向Driver注册Executor的资源信息,TaskScheduler将保存已经分配给应用的Executor资源的地址、大小等相关信息。
  4. SparkContext根据各种转换API,构建RDD之间的血缘关系和DAG,RDD构成的DAG将最终提交给DAGScheduler。
  5. DAGScheduler给提交的DAG创建Job,并根据RDD的依赖性质将DAG划分为不同的Stage。DAGScheduler根据Stage内RDD的Partition数量创建多个Task并批量提交给TaskScheduler。
  6. TaskScheduler对批量的Task按照FIFO或FAIR调度算法进行调度,然后给Task分配Executor资源
  7. 最后将Task发送给Executor由Executor执行。此外,SparkContext还会在RDD转换开始之前使用BlockManager和BroadcastManager将任务的Hadoop配置进行广播。

3)集群管理器(Cluster Manager)会根据应用的需求,给应用分配资源,即将具体任务分配到不同Worker节点上的多个Executor来处理任务的运行。Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。

4)Task在运行的过程中需要对一些数据(如中间结果、检查点等)进行持久化,Spark支持选择HDFS、Amazon S3、Alluxio(原名叫Tachyon)等作为存储。

Spark集群架构

从集群部署的角度看,Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)、应用程序(Application)等部分组成,其整体关系如下图所示:

1)Cluster Manager:Spark的集群管理器,主要负责对整个集群资源的分配与管理。ClusterManager在YARN部署模式下为ResourceManager;在Mesos部署模式下为Mesos Master;在Standalone部署模式下为Master。Cluster Manager分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给Application,但是并不负责对Executor的资源分配。Standalone部署模式下的Master会直接给Application分配内存、CPU及Executor等资源。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。

2)Worker:Spark的工作节点。在YARN部署模式下实际由NodeManager替代。Worker节点主要负责以下工作:将自己的内存、CPU等资源通过注册机制告知Cluster Manager;创建Executor;将资源和任务进一步分配给Executor;同步资源信息、Executor状态信息给Cluster Manager等。在Standalone部署模式下,Master将Worker上的内存、CPU及Executor等资源分配给Application后,将命令Worker启动CoarseGrainedExecutorBackend进程(此进程会创建Executor实例)。

3)Executor:主要负责任务的执行及与Worker、Driver的信息同步。

4)Driver: Application的驱动程序,Application通过Driver与Cluster Manager、Executor进行通信。Driver可以运行在Application中,也可以由Application提交给Cluster Manager并由Cluster Manager安排Worker运行。

5)Application:用户使用Spark提供的API编写的应用程序,Application通过Spark API将进行RDD的转换和DAG的构建,并通过Driver将Application注册到Cluster Manager。Cluster Manager将会根据Application的资源需求,通过一级分配将Executor、内存、CPU等资源分配给Application。Driver通过二级分配将Executor等资源分配给每一个任务,Application最后通过Driver告诉Executor运行任务。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-05-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

3.5 容错机制及依赖

3.5 容错机制及依赖 一般而言,对于分布式系统,数据集的容错性通常有两种方式: 1)数据检查点(在Spark中对应Checkpoint机制)。 2)记录数据...

3857
来自专栏java达人

自学Apache Spark博客(节选)

作者:Kumar Chinnakali 译者:java达人 来源:http://dataottam.com/2016/01/10/self-learn-yo...

2259
来自专栏技术专栏

慕课网Spark SQL日志分析 - 1.Hadoop概述

http://hadoop.apache.org/ 对于Apache项目来说,projectname.apache.org Hadoop:hadoop.ap...

2134
来自专栏王小雷

Hadoop YARN学习之核心概念(2)

Hadoop YARN学习之核心概念(2) 1. Hadoop 2.X YARN引入的新服务 1.1 新的ResourceManager纯碎作为资源调度器,是集...

24710
来自专栏祝威廉

Spark Streaming 数据接收优化

看这篇文章前,请先移步Spark Streaming 数据产生与导入相关的内存分析, 文章重点讲的是从Kafka消费到数据进入BlockManager的这条线路...

911
来自专栏Hadoop实操

如何使用Spark Streaming读取HBase的数据并写入到HDFS

Spark Streaming是在2013年被添加到Apache Spark中的,作为核心Spark API的扩展它允许用户实时地处理来自于Kafka、Flum...

9244
来自专栏大数据

Spark Streaming入门

本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流...

6579
来自专栏加米谷大数据

加米谷学院:Spark核心技术原理透视一(Spark运行原理)

在大数据领域,只有深挖数据科学领域,走在学术前沿,才能在底层算法和模型方面走在前面,从而占据领先地位。

81014
来自专栏about云

Hadoop2.x 让你真正明白yarn

问题导读 1.hadoop1.x中mapreduce框架与yarn有什么共同点? 2.它们有什么不同点? 3.yarn中有哪些改变? 4.yarn中有哪些术语...

6328
来自专栏编程

大数据干货系列(六)-Spark总结

本文共计1611字,预计阅读时长八分钟 Spark总结 一、本质 Spark是一个分布式的计算框架,是下一代的MapReduce,扩展了MR的数据处理流程 二、...

1975

扫码关注云+社区