导读:Spark是由加州大学伯克利分校AMP实验室开源的分布式大规模数据处理通用引擎,具有高吞吐、低延时、通用易扩展、高容错等特点。Spark内部提供了丰富的开发库,集成了数据分析引擎Spark SQL、图计算框架GraphX、机器学习库MLlib、流计算引擎Spark Streaming。
Spark在函数式编程语言Scala中实现,提供了丰富的开发API,支持Scala、Java、Python、R等多种开发语言。同时,Spark提供了多种运行模式,既可以采用独立部署的方式运行,也可以依托Hadoop YARN、Apache Mesos等资源管理器调度任务运行。
目前,Spark已经在金融、交通、医疗、气象等多种领域中广泛使用。
作者:肖冠宇
如需转载请联系大数据(ID:hzdashuju)
01 Spark概述
1. 核心概念介绍
Spark架构示意图如图2-1所示,下面将分别介绍各核心组件。
▲图2-1 Spark架构示意图
2. RDD介绍
RDD从字面上理解有些困难,我们可以认为是一种分布式多分区只读的数组,Spark计算操作都是基于RDD进行的。
RDD具有几个特性:只读、多分区、分布式,可以将HDFS块文件转换成RDD,也可以由一个或多个RDD转换成新的RDD,失效自动重构。基于这些特性,RDD在分布式环境下能够被高效地并行处理。
(1)计算类型
在Spark中RDD提供Transformation和Action两种计算类型。Transformation操作非常丰富,采用延迟执行的方式,在逻辑上定义了RDD的依赖关系和计算逻辑,但并不会真正触发执行动作,只有等到Action操作才会触发真正执行操作。Action操作常用于最终结果的输出。
常用的Transformation操作及其描述:
常用的Action操作及其描述:
从HDFS文件生成Spark RDD,经过map、filter、join等多次Transformation操作,最终调用saveAsTextFile Action操作将结果集输出到HDFS,并以文件形式保存。RDD的流转过程如图2-2所示。
▲图2-2 RDD的流转过程示意图
(2)缓存
在Spark中RDD可以缓存到内存或者磁盘上,提供缓存的主要目的是减少同一数据集被多次使用的网络传输次数,提高Spark的计算性能。Spark提供对RDD的多种缓存级别,可以满足不同场景对RDD的使用需求。RDD的缓存具有容错性,如果有分区丢失,可以通过系统自动重新计算。
在代码中可以使用persist()方法或cache()方法缓存RDD。cache()方法默认将RDD缓存到内存中,cache()方法和persist()方法都可以用unpersist()方法来取消RDD缓存。示例如下:
val fileDataRdd = sc.textFile("hdfs://data/hadoop/test.text")
fileDataRdd.cache() // 缓存RDD到内存
或者
fileDataRdd.persist(StorageLevel.MEMORY_ONLY)
fileDataRdd..unpersist() // 取消缓存
Spark的所有缓存级别定义在org.apache.spark.storage.StorageLevel对象中,如下所示。
object storageLevel extends scala.AnyRef with scala.Serializable {
val NONE : org.apache.spark.storage.StorageLevel
val DISK_ONLY : org.apache.spark.storage.StorageLevel
val DISK_ONLY_2 : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY_2 : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY_SER : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY_SER_2 : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK_2 : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK_SER : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK_SER_2 : org.apache.spark.storage.StorageLevel
val OFF_HEAP : org.apache.spark.storage.StorageLevel
Spark各缓存级别及其描述:
(3)依赖关系
窄依赖(Narrow Dependency):父RDD的分区只对应一个子RDD的分区,如图2-3所示,如果子RDD只有部分分区数据损坏或者丢失,只需要从对应的父RDD重新计算恢复。
▲图2-3 窄依赖示意图
宽依赖(Shuffle Dependency):子RDD分区依赖父RDD的所有分区,如图2-4所示。如果子RDD部分分区甚至全部分区数据损坏或丢失,需要从所有父RDD重新计算,相对窄依赖而言付出的代价更高,所以应尽量避免宽依赖的使用。
▲图2-4 宽依赖示意图
Lineage:每个RDD都会记录自己依赖的父RDD信息,一旦出现数据损坏或者丢失将从父RDD迅速重新恢复。
3. 运行模式
Spark运行模式主要有以下几种:
提交作业命令:
./bin/spark-submit --class package.MainClass \ # 作业执行主类,需要完成的包路径
--master spark://host:port, mesos://host:port, yarn, or local\Maste
# 运行方式
---deploy-mode client,cluster\ # 部署模式,如果Master采用YARN模式则可以选择使用clent模式或者cluster模式,默认client模式
--driver-memory 1g \ # Driver运行内存,默认1G
---driver-cores 1 \ # Driver分配的CPU核个数
--executor-memory 4g \ # Executor内存大小
--executor-cores 1 \ # Executor分配的CPU核个数
---num-executors \ # 作业执行需要启动的Executor数
---jars \ # 作业程序依赖的外部jar包,这些jar包会从本地上传到Driver然后分发到各Executor classpath中。
lib/spark-examples*.jar \ # 作业执行JAR包
[other application arguments ] # 程序运行需要传入的参数
作业在yarn-cluster模式下的执行过程如图2-5所示。
▲图2-5 作业在yarn-cluster模式下的执行过程
02 Shuffle详解
Shuffle最早出现于MapReduce框架中,负责连接Map阶段的输出与Reduce阶段的输入。Shuffle阶段涉及磁盘IO、网络传输、内存使用等多种资源的调用,所以Shuffle阶段的执行效率影响整个作业的执行效率,大部分优化也都是针对Shuffle阶段进行的。
Spark是实现了MapReduce原语的一种通用实时计算框架。Spark作业中Map阶段的Shuffle称为Shuffle Write,Reduce阶段的Shuffle称为Shuffle Read。
Shuffle Write阶段会将Map Task中间结果数据写入到本地磁盘,而在Shuffle Read阶段中,Reduce Task从Shuffle Write阶段拉取数据到内存中并行计算。Spark Shuffle阶段的划分方式如图2-6所示。
▲图2-6 Spark Shuffle阶段的划分方式
1. Shuffle Write实现方式
(1)基于Hash的实现(hash-based)
每个Map Task都会生成与Reduce Task数据相同的文件数,对Key取Hash值分别写入对应的文件中,如图2-7所示。
生成的文件数FileNum=MapTaskNum×ReduceTaskNum,如果Map Task和Reduce Task数都比较多就会生成大量的小文件,写文件过程中,每个文件都要占用一部分缓冲区,总占用缓冲区大小TotalBufferSize=CoreNum×ReduceTaskNum×FileBufferSize,大量的小文件就会占用更多的缓冲区,造成不必要的内存开销,同时,大量的随机写操作会大大降低磁盘IO的性能。
▲图2-7 基于Hash的实现方式
由于简单的基于Hash的实现方式扩展性较差,内存资源利用率低,过多的小文件在文件拉取过程中增加了磁盘IO和网络开销,所以需要对基于Hash的实现方式进行进一步优化,为此引入了Consolidate(合并)机制。
如图2-8所示,将同一个Core中执行的Task输出结果写入到相同的文件中,生成的文件数FileNum=CoreNum×ReduceTaskNum,这种优化方式减少了生成的文件数目,提高了磁盘IO的吞吐量,但是文件缓存占用的空间并没有减少,性能没有得到明显有效的提高。
▲图2-8 优化后的基于Hash的实现方式
设置方式:
基于Hash的实现方式的优缺点:
(2)基于Sort的实现方式(sort-based)
为了解决基于Hash的实现方式的诸多问题,Spark Shuffle引入了基于Sort的实现方式,如图2-9所示。该方式中每个Map Task任务生成两个文件,一个是数据文件,一个是索引文件,生成的文件数FileNum=MapTaskNum×2。
数据文件中的数据按照Key分区在不同分区之间排序,同一分区中的数据不排序,索引文件记录了文件中每个分区的偏移量和范围。当Reduce Task读取数据时,先读取索引文件找到对应的分区数据偏移量和范围,然后从数据文件读取指定的数据。
设置方式:
▲图2-9 基于Sort的实现方式
基于Sort的实现方式的优缺点:
2. Shuffle Read实现方式
Shuffle Read阶段中Task通过直接读取本地Shuffle Write阶段产生的中间结果数据或者通过HTTP的方式从远程Shuffle Write阶段拉取中间结果数据进行处理。Shuffle Write阶段基于Hash和基于Sort两种实现方式产生的中间结果数据在Shuffle Read阶段采用同一种实现方式。
关于作者:资深大数据研发工程师,有多年的大数据工作经验,对高性能分布式系统架构、大数据技术、数据分析等有深入的研究。
本文摘编自《企业大数据处理:Spark、Druid、Flume与Kafka应用实践》,经出版方授权发布。