速度快
1.Spark 使用DAG 调度器、查询优化器和物理执行引擎,能够在批处理和流数据获得很高的性能。2.spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘;3.Spark计算框架对内存的利用和运行的并行度比mapreduce高,Spark运行容器为executor,内部ThreadPool中线程运行一个Task,mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask。Spark程序运行并行度高;
容错性高
1.Spark通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在节点内存中的只读性的数据集,这些集合是弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算;2.Spark采用CheckPoint机制,对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了。CheckPoint机制是我们在spark中用来保障容错性的主要机制,它可以阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。
通用性强-集成度高
1.以Spark为基础建立起来的模块(库)有Spark SQL,Spark Streaming,MLlib(machine learning)和GraphX(graph)。我们可以很容易地在同一个应用中将这些库结合起来使用,以满足我们的实际需求。并且提供了transformation和action这两大类的多功能api。mapreduce只提供了map和reduce两种操作,流计算及其他的模块支持比较缺乏;2.Spark框架和生态更为复杂,有RDD,血缘lineage、执行时的有向无环图DAG,stage划分等,很多时候spark作业都需要根据不同业务场景的需要进行调优以达到性能要求,mapreduce框架及其生态相对较为简单,对性能的要求也相对较弱,运行较为稳定,适合长期后台运行;
兼容性强
spark任务支持多种调度方式包括Yarn、mesos、Standalone等。可通过Spark直接对接大数据生态中Hbase、Hdfs、Kafka等多种数据源。
•Hadoop将中间结果存放在HDFS中,每次MR都需要刷写-调用,而Spark中间结果存放优先存放在内存中,内存不够再存放在磁盘中,不放入HDFS,避免了大量的IO和刷写读取操作;•Hadoop底层使用MapReduce计算架构,只有map和reduce两种操作,表达能力比较欠缺,而且在MR过程中会重复的读写hdfs,造成大量的磁盘io读写操作,所以适合高时延环境下批处理计算的应用;Spark是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括map、reduce、filter、flatmap、groupbykey、reducebykey、union和join等,数据分析更加快速,所以适合低时延环境下计算的应用;•spark与hadoop最大的区别在于迭代式计算模型。基于mapreduce框架的Hadoop主要分为map和reduce两个阶段,所以在一个job里面能做的处理很有限,对于复杂的计算,需要使用多次MR;spark计算模型是基于内存的迭代式计算模型,根据用户编写的RDD算子和程序,在调度时根据宽窄依赖可以生成多个Stage,根据action算子生成多个Job。所以spark相较于mapreduce,计算模型更加灵活,可以提供更强大的功能。•由于spark基于内存进行计算,在面对大量数据且没有进行调优的情况下,可能会出现比如OOM内存溢出等情况,导致spark程序可能无法运行起来,而mapreduce虽然运行缓慢,但是至少可以慢慢运行完。•Hadoop适合处理静态数据,对于迭代式流式数据的处理能力差;Spark通过在内存中缓存处理的数据,提高了处理流式数据和迭代式数据的性能;
Spark有以下四种部署方式,分别是:Local,Standlone,Yarn,Mesos Local 本地运行模式 (单机)
•该模式被称为Local[N]模式,是用单机的多个线程来模拟Spark分布式计算,直接运行在本地,便于调试,通常用来验证开发出来的应用程序逻辑上有没有问题。•其中N代表可以使用N个线程,每个线程拥有一个core。如果不指定N,则默认是1个线程(该线程有1个core)。•如果是local[*],则根据当前CPU的核数来自动设置线程数
Standlone
独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。它是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点
在standalone部署模式下又分为client模式和cluster模式
client模式:driver和client运行于同一JVM中,不在worker上启动,该JVM进程直到spark application计算完成返回结果后才退出
cluster模式:driver由worker启动,client在确认spark application成功提交给cluster后直接退出,并不等待spark application运行结果返回
Yarn
通常,生产环境中,我们是把Spark程序在YARN中执行。而Spark程序在YARN中运行有两种模式,一种是Cluster模式、一种是Client模式。这两种模式的关键区别就在于Spark的driver是运行在什么地方。
client模式:如果是Client模式,Driver就运行在提交spark程序的地方,Spark Driver是需要不断与任务运行的Container交互的,所以运行Driver的client是必须在网络中可用的,直到应用程序结束。在本地环境测试的时候经常使用
cluster模式:本地进程则仅仅只是一个client,它会优先向yarn申请AppMaster资源运行AppMaster,在运行AppMaster的时候通过反射启动Driver(我们的应用代码),在SparkContext初始化成功后,再向yarn注册自己并申请Executor资源,此时Driver与AppMaster运行在同一个container里,是两个不同的线程,当Driver运行完毕,AppMaster会释放资源并注销自己。所以在该模式下,本地进程仅仅是一个client,如果结束了该进程,整个Spark任务也不会退出,因为Driver是在远程运行的
参数名 | 参数说明 |
---|---|
--master | master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local |
--deploy-mode | 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client |
--class | 应用程序的主类,仅针对 java 或 scala 应用 |
--name | 应用程序的名称 |
--jars | 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 |
--packages | 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 |
--exclude-packages | 为了避免冲突 而指定不包含的 package |
--repositories | 远程 repository |
--conf PROP=VALUE | 指定 spark 配置属性的值, 例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" |
--properties-file | 加载的配置文件,默认为 conf/spark-defaults.conf |
--driver-memory | Driver内存,默认 1G |
--driver-java-options | 传给 driver 的额外的 Java 选项 |
--driver-library-path | 传给 driver 的额外的库路径 |
--driver-class-path | 传给 driver 的额外的类路径 |
--driver-cores | Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 |
--executor-memory | 每个 executor 的内存,默认是1G |
--total-executor-cores | 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 |
--num-executors | 启动的 executor 数量。默认为2。在 yarn 下使用 |
--executor-core | 每个 executor 的核数。在yarn或者standalone下使用 |
Spark的作业提交流程根据部署模式不同,其提交流程也不相同。目前企业中最常用的部署模式为Yarn,主要描述Spark在采用Yarn的情况下的作业提交流程。Spark程序在YARN中运行有两种模式,一种是Cluster模式、一种是Client模式。 yarn-client
1.client向ResouceManager申请启动ApplicationMaster,同时在SparkContext初始化中创建DAGScheduler和TaskScheduler2.ResouceManager收到请求后,在一台NodeManager中启动第一个Container运行ApplicationMaster。与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派3.Client中的SparkContext初始化完毕后,与Application Master建立通讯,向Resource Manager注册,根据任务信息向Resource Manager申请资源(Container)4.当application master申请到资源后,便与node manager通信,要求它启动container5.Container启动后向driver中的sparkContext注册,并申请task6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
yarn-cluster
1.Spark Yarn Client向YARN中提交应用程序,包括Application Master程序、启动Application Master的命令、需要在Executor中运行的程序等;2.Resource manager收到请求后,在其中一个node manager中为应用程序分配一个container,要求它在container中启动应用程序的Application Master,Application master初始化sparkContext以及创建DAG Scheduler和Task Scheduler。3.Application master根据sparkContext中的配置,向resource manager申请container,同时,Application master向Resource manager注册,这样用户可通过Resource manager查看应用程序的运行状态4.Resource manager 在集群中寻找符合条件的node manager,在node manager启动container,要求container启动executor,5.Executor启动后向Application master注册,并接收Application master分配的task6.应用程序运行完成后,Application Master向Resource Manager申请注销并关闭自己。
RDD是spark提供的核心抽象,全称为弹性分布式数据集。Spark中的所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。rdd执行过程中会形成DAG图,在DAG中又根据宽窄依赖进行stage的划分,形成lineage血缘保证容错性等。
RDD 的算子主要分成2类,action和transformation。transformation算子不会立即触发作业提交的,每一个 transformation 方法返回一个新的 RDD。action会触发真正的作业提交,一旦触发action就形成了一个完整的DAG。原始的RDD通过一系列的transformation操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算。
RDD最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的RDD partition因为节点故障,导致数据丢失,那么RDD可以通过自己的数据血缘重新计算该partition。这一切对使用者都是透明的。
RDD在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作(分布式数据集)
RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。
reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行combiner。这样做的好处在于,在map端进行一次combiner之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。
所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。
cache、persist 首先,cache和persist都是用于将一个RDD进行缓存的。RDD 通过 persist 或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法,默认存储只存在内存中(MEMORY_ONLY) cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
持久化级别 | 说明 |
---|---|
MORY_ONLY(默认) | 将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果没有足够的内存存储 RDD,则某些分区将不会被缓存,每次需要时都会重新计算。这是默认级别 |
MORY_AND_DISK(开发中可以使用这个) | 将 RDD 以非序列化的 Java 对象存储在 JVM 中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取 |
MEMORY_ONLY_SER (Java and Scala) | 将 RDD 以序列化的 Java 对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的 CPU |
MEMORY_AND_DISK_SER (Java and Scala) | 与 MEMORY_ONLY_SER 类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们 |
DISK_ONLY | 将 RDD 分区存储在磁盘上 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2 等 | 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上 |
OFF_HEAP(实验中) | 与 MEMORY_ONLY_SER 类似,但将数据存储在堆外内存中。(即不是直接存储在 JVM 内存中) |
checkpoint
Checkpoint 的产生就是为了更加可靠的数据持久化,在 Checkpoint 的时候一般把数据放在在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。
开发中如何保证数据的安全性性及读取效率:可以对频繁使用且重要的数据,先做缓存/持久化,再做 checkpint 操作。
缓存与checkpoint的区别
位置:缓存只能保存在本地的磁盘和内存中, Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。生命周期:缓存的RDD会在程序结束或者手动调用unpersist方法后会被清除。Checkpoint的RDD在程序结束后依然存在,不会被删除。依赖关系:缓存不会丢掉RDD间的依赖关系,CheckPoint会切断依赖关系。
关系: 两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
区别: coalesce()方法的参数shuffle默认设置为false,coalesce 根据传入的参数来判断是否发生shuffle。repartition()方法就是coalesce()方法shuffle为true的情况,repartition一定会发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
累加器 accumulators:因为task的执行是在多个Executor中执行,所以会出现计算总量的时候,每个Executor只会计算部分数据,不能全局计算。累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
广播变量 broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本,起到节省资源和优化的作用。它通常用来高效分发较大的对象。
窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据规模无关)
宽依赖是指父RDD的每个分区都可能被多个子RDD分区所使用,子RDD分区通常对应所有的父RDD分区(O(n),与数据规模有关)
其中,宽依赖会造成Shuffle。
DAG 划分stage的规则:在运行时也就是触发action算子开始向前回溯后,遇到宽依赖就切分成一个stage。每一个stage包含一个或多个并行的task任务
Master实际上可以配置两个,Spark原生的standalone模式是支持Master主备切换的。当Active Master节点挂掉以后,我们可以将Standby Master切换为Active Master。
Spark Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于ZooKeeper的。
基于文件系统的主备切换机制,需要在Active Master挂掉之后手动切换到Standby Master上;
而基于Zookeeper的主备切换机制,可以实现自动切换Master。