最近在拜读许老师的《大数据处理框架Apache Spark设计与实现》,之前看豆瓣评分很高,阅读了一下果然通俗易懂,在这里记录一下相关的笔记,补充了一些个人理解,如有不对还请指正。参考链接:https://github.com/JerryLead/SparkInternals
Spark在集群上部署有多个版本:Standalone、Mesos、YARN、Kubernetes。其中Standalone是Spark官方的,其他都是第三方框架。YARN是目前主流之一,可以同时管理Spark、Hadoop Mapreduce任务。
Spark和MapReduce一样是Master-Worker结构。由于在介绍Spark原理的时候会涉及到很多名词,一不小心就容易搞混淆,因此先梳理一下几个名词:
以上介绍可以看出来Spark这么设计相比于Hadoop MapReduce的优点和缺点:
下面举一个groupby的例子,来了解spark运行的流程。
```scala
package org.apache.spark.examples
import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
/**
* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
*/
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
var numMappers = 3 //该应用包含3个map task
var numKVPairs = 4 //每个task随机生成4个<K,V>record
var valSize = 1000 //每个Value大小1000byte
var numReducers = 2 //由于随机产生的key会有重复,groupby聚合过程使用2个reduce task
val sc = new SparkContext(sparkConf) //也可以通过SparkSession初始化
// 生成一个k-v形式的array
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache()
// Enforce that everything has been calculated and in cache
pairs1.count() //=numMappers * numKVPairs = 12,到这里cache()才会生效
println(pairs1.groupByKey(numReducers).count()) //=4
sc.stop()
}
}
```
代码大致意思是:先用RDD的形式生成一堆key-value形式的数组,key是随机给0~Int最大值,value是一个随机的byte。然后调用groupby和count,把相同的key聚合,计算个数。
这里需要注意,真正在写应用的时候一般不用自己指定map task的个数,通常自动计算为:
MapTask个数=\frac{输入数据大小}{每个分片大小(HDFS默认是128MB)}
实际执行流程比自己的要复杂,需要先建立逻辑处理流程(Logical Plan),然后根据逻辑处理流程生成 物理逻辑流程(Physical Plan),然后生成具体 task 执行。
逻辑处理流程用通俗的话来说就是:各种各样的RDD不停的变换。逻辑处理流程表示的是数据上的依赖关系,不是 task 的执行图。仔细观察上面代码可以发现,action()一共有两次:
由于第二次count()时候数据依赖于前面,因此以变量result
为例。使用 result.toDebugString
输出日志,可以看到整个逻辑处理流程如下:
(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []
+-(3) MapPartitionsRDD[1] at flatMap at GroupByTest.scala:41 []
| CachedPartitions: 3; MemorySize: 12.4 кв;
ExternalBlockStoreSize: 0.0 в; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at GroupByTest. scala:41 []
不难看出生成了3个RDD,后面的中括号是生成的顺序,即:ParallelCollectionRDD
→ MapPartitionsRDD
→ ShuffledRDD
。
另外,(2) ShuffledRDD[2] at groupByKey at GroupByTest.scala:55 []
前面的小括号里面的(2)、(3)是分区(partition)个数的意思。
这个日志“从里往外“看:可以看出来由于生成了3个数组,因此一直到MapPartitionsRDD
都是3个分区。另外,由于我们使用了cache(),因此其中的3个分区计算时候会被缓存为CachedPartitions
。而到groupby的时候,由于我们指定了var numReducers = 2
,因此变成了2个分区。并且转成了ShuffledRDD
。
上一节说的逻辑处理流程(Logical Plan)基本上可以理解是RDD之间的变化的关系,但是并不能执行计算任务,因此需要再转换成物理执行计划(Physical Plan)对任务执行。其中包括执行阶段(Stage)和执行任务(Task)。简单来说可以分成三个步骤:
var numReducers = 2
,变成了2个分区,因此在这里是2个task来计算任务。stage 0→stage 1,这个过程称为shuffle机制,会将数据重新分配。注:为什么要拆分执行阶段(Stage)?
如果想知道自己Spark Application的运行流程,可以根据Spark提供的执行界面查看。
Job日志可以查看Stage的运行的情况
上图点开后,可以看到多个stage,点击stage的超链接(或者从Job那边点超链接也可以),可以查看该stage的运行情况。
打开之后可以查看DAG,查看RDD的生成顺序,同时也可以查看每个task的运行时间,方便排查问题。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。