Spark是一个通用的并行计算框架,由加州伯克利大学(UC Berkeley) 的AMP实验室开发于2009年,并于2010年开源,2013年成长为Apache旗下在大数据领域最活跃的开源项目之一。
虽然Spark是一个通用的并行计算框架,但是Spark本质上也是一个基于map-reduce算法模型实现的分布式计算框架,Spark不仅拥有了Hadoop MapReduce的能力和优点,还解决了Hadoop MapReduce中的诸多性能缺陷。
HadoopMapReduce的问题与演进
早期的Hadoop MapReduce采用的是MRv1版本的MapReduce编程模型。MRv1具体实现可以参考org.apache.hadoop.mapred包。MRv1的Map和Reduce都是通过接口实现的。MRv1主要包括以下三个部分:
1)运行时环境(JobTracker和TaskTracker)
2)编程模型(MapReduce)
3)数据处理引擎(Map任务和Reduce任务)
MRv1将集群管理功能和数据处理能力紧耦合在一起,如下图所示:
这种紧耦合的设计会导致以下问题:
1)可扩展性差:在运行时,JobTracker既负责资源管理,又负责任务调度,当集群繁忙时,JobTracker很容易成为瓶颈,最终导致它的可扩展性问题。
2)可用性差:采用了单节点的Master,没有备用Master及选举操作,这导致一旦Master出现故障,整个集群将不可用。
3)资源利用率低:TaskTracker使用slot等量划分本节点上的资源量。slot代表计算资源(CPU、内存等)。任务需要获取到slot后才能运行,Hadoop调度器负责将各个TaskTracker上的空闲slot分配给Task使用。即使一些Task不能充分利用slot所代表的资源,其他Task也无法使用这些空闲的资源。在MRv1中,slot分为Map slot和Reduce slot两种,分别供MapTask和Reduce Task使用。有时会出现因为作业刚刚启动等原因导致MapTask很多,而Reduce Task任务还没有调度的情况,这时Reduce slot就会被闲置。
4)无法支持多种MapReduce框架:无法通过可插拔方式将自身的MapReduce框架替换为其他实现,如Spark、Storm等。
Apache社区为了解决上述问题,对Hadoop MRv1进行改造,将集群管理和数据处理进行解耦,演进出MRv2。在MRv2中,MRv1所包含的两大功能-集群管理和数据处理被解耦。负责集群管理的JobTracker和TaskTracker被重构为通用的资源管理器(资源调度平台)ResourceManager(RM)、节点管理器NodeManager(NM)和负责各个计算框架的任务调度模型ApplicationMaster(AM)。在MRv2中,资源调度采用两级调度方案,ResourceManager负责整个集群的资源管理,并将NodeManager汇报的空闲资源封装成container提供给ApplicationMaster完成第一级调度。而负责计算框架任务调度的ApplicationMaster则根据实际应用的具体情况进行第二级资源调度。二级调度的设计大大减少了ResourceManager的压力。NodeManager负责对单个节点的资源管理,并将资源信息、Container运行状态、健康状况等信息上报给ResourceManager。ResourceManager为了保证Container的利用率,会监控Container,如果Container未在有限的时间内使用,ResourceManager将命令NodeManager“杀死”Container,以便将资源分配给其他任务。经过将集群资源管理和数据处理解耦后,MRv2的核心不再是MapReduce框架,而是YARN集群管理器。因为在以YARN为核心的MRv2中,MapReduce框架是可插拔的,完全可以替换为其他MapReduce实现,比如Spark、Storm等。MRv2的示意图如下所示:
虽然Hadoop MRv2解决了MRv1中的一些问题,但是由于对HDFS的频繁操作(包括计算结果持久化、数据备份、资源下载及shuffle等),导致磁盘I/O成为系统性能的瓶颈,因此只适用于离线数据处理或批处理,而不能支持对迭代式、流式(实时式)数据的处理。
Spark对Hadoop的优化与改进
Spark的作者看到了MRv1的问题,并对MapReduce做了大量的改进和优化,主要包括以下5个方面:
1)减少磁盘I/O:
2)增加任务并行度:由于将中间结果写到磁盘与从磁盘读取中间结果属于不同的环节,Hadoop将它们简单地通过串行执行衔接起来。而Spark则把不同的环节抽象为Stage,允许多个Stage既可以串行执行,又可以并行执行。
3)避免重新计算:当Stage中某个分区的Task执行失败后,会重新对此Stage调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。
4)可选的Shuffle排序:Hadoop MapReduce在Shuffle之前会将中间结果按key的hash值和key值大小进行两层排序,确保分区内部的有序性。而Spark则可以根据不同场景选择在map端排序还是reduce端排序。
5)灵活的内存管理策略:Spark将内存分为堆上的存储内存、堆外的存储内存、堆上的执行内存、堆外的执行内存4个部分。Spark既提供了执行内存和存储内存之间固定边界的实现,又提供了执行内存和存储内存之间“软”边界的实现。Spark默认使用“软”边界的实现,执行内存或存储内存中的任意一方在资源不足时都可以借用另一方的内存,最大限度地提高资源的利用率,减少对资源的浪费。Spark由于对内存使用的偏好,内存资源的多寡和使用率就显得尤为重要,为此Spark的内存管理器提供的Tungsten实现了一种与操作系统的内存Page非常相似的数据结构,用于直接操作操作系统内存,节省了创建的Java对象在堆中占用的内存,使得Spark对内存的使用效率更加接近硬件。Spark会给每个Task分配一个配套的任务内存管理器,对Task粒度的内存进行管理。Task的内存可以被多个内部的消费者消费,任务内存管理器对每个消费者进行Task内存的分配与管理,因此Spark对内存有着更细粒度的管理。
除了上述的改进外,Spark还具有以下特点:
1)检查点支持:Spark的RDD之间维护了血缘关系(lineage),一旦某个RDD失败了,则可以由父RDD重建。虽然lineage可用于错误后RDD的恢复,但对于很长的lineage来说,恢复过程非常耗时。如果应用启用了检查点,那么在Stage中的Task都执行成功后,SparkContext将把RDD计算的结果保存到检查点,这样当某个RDD执行失败后,再由父RDD重建时就不需要重新计算,而直接从检查点恢复数据。
2)易于使用。Spark现在支持Java、Scala、Python和R等语言编写应用程序,大大降低了使用者的门槛。除此之外,还自带了80多个高等级操作符,允许在Scala、Python、R的shell中进行交互式查询。
3)支持交互式:Spark使用Scala开发,并借助于Scala类库中的Iloop实现交互式shell,提供对REPL(Read-eval-print-loop)的实现。
4)支持SQL查询。在数据查询方面,Spark支持SQL及Hive SQL,这极大地方便了传统SQL开发和数据仓库的使用者。
5)支持流式计算:与MapReduce只能处理离线数据相比,Spark还支持实时的流计算。Spark依赖Spark Streaming对数据进行实时的处理,其流式处理能力还要强于Storm。
6)高可用:Spark自身实现了Standalone部署模式,此模式下的Master可以有多个,解决了单点故障问题。Spark也完全支持使用外部的部署模式,比如YARN、Mesos、EC2等。
7)丰富的数据源支持:Spark除了可以访问操作系统自身的文件系统和HDFS之外,还可以访问Kafka、Socket、Cassandra、HBase、Hive、Alluxio(Tachyon)及任何Hadoop的数据源。
8)丰富的文件格式支持:Spark支持文本文件格式、CSV文件格式、JSON文件格式、ORC文件格式、Parquet文件格式、Libsvm文件格式,有利于Spark与其他数据处理平台的对接。
Spark基础概念
RDD(resillient distributed dataset):弹性分布式数据集。Spark应用程序通过使用Spark的转换API,可以将RDD封装为一系列具有血缘关系的RDD,也就是DAG。只有通过Spark的动作API才会将RDD及其DAG提交到DAGScheduler。RDD的祖先一定是一个跟数据源相关的RDD,负责从数据源迭代读取数据。
DAG(Directed Acycle graph):有向无环图。在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。Spark使用DAG来反映各RDD之间的依赖或血缘关系。 Partition:数据分区,即一个RDD的数据可以划分为多少个分区。Spark根据Partition的数量来确定Task的数量。
NarrowDependency:窄依赖,即子RDD依赖于父RDD中固定的Partition。Narrow-Dependency分为OneToOneDependency和RangeDependency两种。
ShuffleDependency:Shuffle依赖,也称为宽依赖,即子RDD对父RDD中的所有Partition都可能产生依赖。子RDD对父RDD各个Partition的依赖将取决于分区计算器(Partitioner)的算法。
Job:用户提交的作业。当RDD及其DAG被提交给DAGScheduler调度后,DAGScheduler会将所有RDD中的转换及动作视为一个Job。一个Job由一到多个Task组成。
Stage:Job的执行阶段。DAGScheduler按照ShuffleDependency作为Stage的划分节点对RDD的DAG进行Stage划分(上游的Stage将为ShuffleMapStage)。因此一个Job可能被划分为一到多个Stage。Stage分为ShuffleMapStage和ResultStage两种。
Task:具体执行任务。一个Job在每个Stage内都会按照RDD的Partition数量,创建多个Task。Task分为ShuffleMapTask和ResultTask两种。ShuffleMapStage中的Task为ShuffleMapTask,而ResultStage中的Task为ResultTask。ShuffleMapTask和ResultTask类似于Hadoop中的Map任务和Reduce任务。
Shuffle:Shuffle是所有MapReduce计算框架的核心执行阶段,Shuffle用于打通map任务(在Spark中就是ShuffleMapTask)的输出与reduce任务(在Spark中就是ResultTask)的输入,map任务的中间输出结果按照指定的分区策略(例如,按照key值哈希)分配给处理某一个分区的reduce任务。
Spark基本组成与架构
Apache Spark由SparkCore、Spark SQL、Spark Streaming、GraphX、MLlib等模块组成。模块间的整体关系如下图所示:
其中Spark Core是Apache Spark的核心,是其他扩展模块的基础运行时环境。下面我们简要描述SparkCore的功能和其他扩展模块的功能。
Spark编程模型
Spark应用程序从编写到提交、执行、输出的整个过程如下图所示:
步骤如下:
1)用户使用SparkContext提供的API编写Driver应用程序。有时我们会使用SparkSession、DataFrame、SQLContext、HiveContext以及StreamingContext等提供的API编写Driver应用程序,其实SparkSession、DataFrame、SQLContext、HiveContext以及StreamingContext都对SparkContext进行了封装,并提供了DataFrame、SQL、Hive以及流式计算相关的API。
2)使用SparkContext提交的用户应用程序:
3)集群管理器(Cluster Manager)会根据应用的需求,给应用分配资源,即将具体任务分配到不同Worker节点上的多个Executor来处理任务的运行。Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。
4)Task在运行的过程中需要对一些数据(如中间结果、检查点等)进行持久化,Spark支持选择HDFS、Amazon S3、Alluxio(原名叫Tachyon)等作为存储。
Spark集群架构
从集群部署的角度看,Spark集群由集群管理器(Cluster Manager)、工作节点(Worker)、执行器(Executor)、驱动器(Driver)、应用程序(Application)等部分组成,其整体关系如下图所示:
1)Cluster Manager:Spark的集群管理器,主要负责对整个集群资源的分配与管理。ClusterManager在YARN部署模式下为ResourceManager;在Mesos部署模式下为Mesos Master;在Standalone部署模式下为Master。Cluster Manager分配的资源属于一级分配,它将各个Worker上的内存、CPU等资源分配给Application,但是并不负责对Executor的资源分配。Standalone部署模式下的Master会直接给Application分配内存、CPU及Executor等资源。目前,Standalone、YARN、Mesos、EC2等都可以作为Spark的集群管理器。
2)Worker:Spark的工作节点。在YARN部署模式下实际由NodeManager替代。Worker节点主要负责以下工作:将自己的内存、CPU等资源通过注册机制告知Cluster Manager;创建Executor;将资源和任务进一步分配给Executor;同步资源信息、Executor状态信息给Cluster Manager等。在Standalone部署模式下,Master将Worker上的内存、CPU及Executor等资源分配给Application后,将命令Worker启动CoarseGrainedExecutorBackend进程(此进程会创建Executor实例)。
3)Executor:主要负责任务的执行及与Worker、Driver的信息同步。
4)Driver: Application的驱动程序,Application通过Driver与Cluster Manager、Executor进行通信。Driver可以运行在Application中,也可以由Application提交给Cluster Manager并由Cluster Manager安排Worker运行。
5)Application:用户使用Spark提供的API编写的应用程序,Application通过Spark API将进行RDD的转换和DAG的构建,并通过Driver将Application注册到Cluster Manager。Cluster Manager将会根据Application的资源需求,通过一级分配将Executor、内存、CPU等资源分配给Application。Driver通过二级分配将Executor等资源分配给每一个任务,Application最后通过Driver告诉Executor运行任务。