图文解析spark2.0核心技术

作者介绍:庄涵,目前就职于腾讯,热衷于前沿技术的研究与实践,对大数据、机器学习领域有着浓厚兴趣。

导语

spark2.0于2016-07-27正式发布,伴随着更简单、更快速、更智慧的新特性,spark 已经逐步替代 hadoop 在大数据中的地位,成为大数据处理的主流标准。本文主要以代码和绘图的方式结合,对运行架构、RDD 的实现、spark 作业原理、Sort-Based Shuffle 的存储原理、 Standalone 模式 HA 机制进行解析。

1、运行架构

Spark支持多种运行模式。单机部署下,既可以用本地(Local)模式运行,也可以使用伪分布式模式来运行;当以分布式集群部署的时候,可以根据实际情况选择Spark自带的独立(Standalone)运行模式、YARN运行模式或者Mesos模式。虽然模式多,但是Spark的运行架构基本由三部分组成,包括SparkContext(驱动程序)、ClusterManager(集群资源管理器)和Executor(任务执行进程)。

1、SparkContext提交作业,向ClusterManager申请资源;

2、ClusterManager会根据当前集群的资源使用情况,进行有条件的FIFO策略:先分配的应用程序尽可能多地获取资源,后分配的应用程序则在剩余资源中筛选,没有合适资源的应用程序只能等待其他应用程序释放资源;

3、ClusterManager默认情况下会将应用程序分布在尽可能多的Worker上,这种分配算法有利于充分利用集群资源,适合内存使用多的场景,以便更好地做到数据处理的本地性;另一种则是分布在尽可能少的Worker上,这种适合CPU密集型且内存使用较少的场景;

4、Excutor创建后与SparkContext保持通讯,SparkContext分配任务集给Excutor,Excutor按照一定的调度策略执行任务集。

2、RDD

弹性分布式数据集(Resilient Distributed Datasets,RDD)作为Spark的编程模型,相比MapReduce模型有着更好的扩展和延伸: 

  • 提供了抽象层次更高的API 
  • 高效的数据共享 
  • 高效的容错性 

2.1、RDD 的操作类型 

RDD大致可以包括四种操作类型:

  • 创建操作(Creation):从内存集合和外部存储系统创建RDD,或者是通过转换操作生成RDD
  • 转换操作(Transformation):转换操作是惰性操作,只是定义一个RDD并记录依赖关系,没有立即执行 
  • 控制操作(Control):进行RDD的持久化,通过设定不同级别对RDD进行缓存
  • 行动操作(Action):触发任务提交、Spark运行的操作,操作的结果是获取到结果集或者保存至外部存储系统 

2.2、RDD 的实现

2.2.1、RDD 的分区 

RDD的分区是一个逻辑概念,转换操作前后的分区在物理上可能是同一块内存或者存储。在RDD操作中用户可以设定和获取分区数目,默认分区数目为该程序所分配到的cpu核数,如果是从HDFS文件创建,默认为文件的分片数。

2.2.2、RDD 的“血统”和依赖关系 

“血统”和依赖关系:RDD 的容错机制是通过记录更新来实现的,且记录的是粗粒度的转换操作。我们将记录的信息称为血统(Lineage)关系,而到了源码级别,Apache Spark 记录的则是 RDD 之间的依赖(Dependency)关系。如上所示,每次转换操作产生一个新的RDD(子RDD),子RDD会记录其父RDD的信息以及相关的依赖关系。 

2.2.3、依赖关系

依赖关系划分为两种:窄依赖(Narrow Dependency)宽依赖(源码中为Shuffle Dependency)。

窄依赖指的是父 RDD 中的一个分区最多只会被子 RDD 中的一个分区使用,意味着父RDD的一个分区内的数据是不能被分割的,子RDD的任务可以跟父RDD在同一个Executor一起执行,不需要经过 Shuffle 阶段去重组数据。

窄依赖包括两种:一对一依赖(OneToOneDependency)范围依赖(RangeDependency) 

一对一依赖: 

范围依赖(仅union方法): 

宽依赖指的是父 RDD 中的分区可能会被多个子 RDD 分区使用。因为父 RDD 中一个分区内的数据会被分割,发送给子 RDD 的所有分区,因此宽依赖也意味着父 RDD 与子 RDD 之间存在着 Shuffle 过程。

宽依赖只有一种:Shuffle依赖(ShuffleDependency) 

3、作业执行原理

作业(Job):RDD每一个行动操作都会生成一个或者多个调度阶段 调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage包含多个任务集(TaskSet),TaskSet的数量与分区数相同。 

任务(Task):分发到Executor上的工作任务,是Spark的最小执行单元  DAGScheduler:DAGScheduler是面向调度阶段的任务调度器,负责划分调度阶段并提交给TaskScheduler 

TaskScheduler:TaskScheduler是面向任务的调度器,它负责将任务分发到Woker节点,由Executor进行执行 

3.1、提交作业及作业调度策略(适用于调度阶段) 

每一次行动操作都会触发SparkContext的runJob方法进行作业的提交。

这些作业之间可以没有任何依赖关系,对于多个作业之间的调度,共有两种:一种是默认的FIFO模式,另一种则是FAIR模式,该模式的调度可以通过设定minShare(最小任务数)和weight(任务的权重)来决定Job执行的优先级。 

FIFO调度策略:优先比较作业优先级(作业编号越小优先级越高),再比较调度阶段优先级(调度阶段编号越小优先级越高) 

FAIR调度策略:先获取两个调度的饥饿程度,是否处于饥饿状态由当前正在运行的任务是否小于最小任务决定,获取后进行如下比较:

  • 优先满足处于饥饿状态的调度 
  • 同处于饥饿状态,优先满足资源比小的调度 
  • 同处于非饥饿状态,优先满足权重比小的调度  
  • 以上情况均相同的情况下,根据调度名称进行排序 

3.2、划分调度阶段(DAG构建) 

DAG构建图:

DAG的构建:主要是通过对最后一个RDD进行递归,使用广度优先遍历每个RDD跟父RDD的依赖关系(前面提到子RDD会记录依赖关系),碰到ShuffleDependency的则进行切割。切割后形成TaskSet传递给TaskScheduler进行执行。 

DAG的作用:让窄依赖的RDD操作合并为同一个TaskSet,将多个任务进行合并,有利于任务执行效率的提高。 

TaskSet结构图:假设数据有两个Partition时,TaskSet是一组关联的,但相互之间没有Shuffle依赖关系的Task集合,TaskSet的ShuffleMapStage数量跟Partition个数相关,主要包含task的集合,stage中的rdd信息等等。Task会被序列化和压缩 

4、存储原理(Sort-Based Shuffle分析)

4.1、Shuffle过程解析(wordcount实例)

1. 数据处理:文件在hdfs中以多个切片形式存储,读取时每一个切片会被分配给一个Excutor进行处理;

2. map端操作:map端对文件数据进行处理,格式化为(key,value)键值对,每个map都可能包含a,b,c,d等多个字母,如果在map端使用了combiner,则数据会被压缩,value值会被合并;(注意:这个过程的使用需要保证对最终结果没有影响,有利于减少shuffle过程的数据传输);

3.reduce端操作:reduce过程中,假设a和b,c和d在同一个reduce端,需要将map端被分配在同一个reduce端的数据进行洗牌合并,这个过程被称之为shuffle。

4.2、map端的写操作

1.map端处理数据的时候,先判断这个过程是否使用了combiner,如果使用了combiner则采用PartitionedAppendOnlyMap数据结构作为内存缓冲区进行数据存储,对于相同key的数据每次都会进行更新合并;如果没有使用combiner,则采用PartitionedPairBuffer数据结构,把每次处理的数据追加到队列末尾;

2.写入数据的过程中如果出现内存不够用的情况则会发生溢写,溢写;使用combiner的则会将数据按照分区id和数据key进行排序,做到分区有序,区中按key排序,其实就是将partitionId和数据的key作为key进行排序;没有使用combiner的则只是分区有序

3.按照排序后的数据溢写文件,文件分为data文件index文件,index文件作为索引文件索引data文件的数据,有利于reduce端的读取;(注意:每次溢写都会形成一个index和data文件,在map完全处理完后会将多个inde和data文件Merge为一个index和data文件)

4.3、reduce端的读操作

有了map端的处理,reduce端只需要根据index文件就可以很好地获取到数据并进行相关的处理操作。这里主要讲reduce端读操作时对数据读取的策略

如果在本地有,那么可以直接从BlockManager中获取数据;如果需要从其他的节点上获取,由于Shuffle过程的数据量可能会很大,为了减少请求数据的时间并且充分利用带宽,因此这里的网络读有以下的策略: 

1.每次最多启动5个线程去最多5个节点上读取数据;

2.每次请求的数据大小不会超过spark.reducer.maxMbInFlight(默认值为48MB)/5

5、Spark的HA机制(Standalone模式)

5.1、Executor异常

当Executor发生异常退出的情况,Master会尝试获取可用的Worker节点并启动Executor,这个Worker很可能是失败之前运行Executor的Worker节点。这个过程系统会尝试10次,限定失败次数是为了避免因为应用程序存在bug而反复提交,占用集群宝贵的资源。

5.2、Worker异常

Worker会定时发送心跳给Master,Master也会定时检测注册的Worker是否超时,如果Worker异常,Master会告知Driver,并且同时将这些Executor从其应用程序列表中删除。

5.3、Master异常

1、ZooKeeper:将集群元数据持久化到ZooKeeper,由ZooKeeper通过选举机制选举出新的Master,新的Master从ZooKeeper中获取集群信息并恢复集群状态;

2、FileSystem:集群元数据持久化到本地文件系统中,当Master出现异常只需要重启Master即可;

3、Custom:通过对StandaloneRecoveryModeFactory抽象类进行实现并配置到系统中,由用户自定义恢复方式。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

庄涵的专栏

1 篇文章3 人订阅

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏「3306 Pai」社区

有爱有恨的MDL锁

MySQL 5.5 中就引入了metadata lock(元数据锁)。用于对管理 database objects(数据库对象)的并发访问,保证数据的一致性。

672
来自专栏扎心了老铁

python分布式环境下的限流器

项目中用到了限流,受限于一些实现方式上的东西,手撕了一个简单的服务端限流器。 服务端限流和客户端限流的区别,简单来说就是: 1)服务端限流 对接口请求进行限流,...

4228
来自专栏Golang语言社区

go写并发注意事项

写go语言并发需要注意的几点: 1. golang 1.2版本 goroutine的runtime.GOMAXPROCS 如果没有被定义,默认是1,从1.5之后...

3368
来自专栏编程

大数据干货系列(六)-Spark总结

本文共计1611字,预计阅读时长八分钟 Spark总结 一、本质 Spark是一个分布式的计算框架,是下一代的MapReduce,扩展了MR的数据处理流程 二、...

1845
来自专栏Leetcode名企之路

求求你别问我一致性hash了

首先,只有存储型的组件,我们才会使用一致性hash;计算型的服务增删节点对整个任务一般没影响,所以负载均衡直接用random就可以。那么像redis、memca...

773
来自专栏个人分享

Spark运行流程概述

993
来自专栏美团技术团队

【技术博客】Spark性能优化指南——高级篇

前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle...

3795
来自专栏https://www.cnblogs.com/L

【Spark篇】---SparkStream初始与应用

SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展、高吞吐量、容错的实时数据流处理,实时数据的来源可以是:Kafka, Flum...

972
来自专栏「3306 Pai」社区

有爱有恨的MDL锁

MySQL 5.5 中就引入了metadata lock(元数据锁)。用于对管理 database objects(数据库对象)的并发访问,保证数据的一致性。

1170
来自专栏Golang语言社区

go写并发注意事项

写go语言并发需要注意的几点: 1. golang 1.2版本 goroutine的runtime.GOMAXPROCS 如果没有被定义,默认是1,从1.5之后...

2995

扫码关注云+社区