Spark内部原理

Spark中的Shuffle、宽依赖窄依赖、RDD持久化、共享变量

1.Shuffle

1.1 什么是Shuffle

Spark是分布式计算系统,数据块在不同节点执行,但是一些操作,例如join,需要将不同节点上相同的Key对应的Value聚集到一起,Shuffle便应运而生。

Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,这期间涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等,所以说Shuffle是整个应用程序运行过程中非常昂贵的一个阶段,理解Spark Shuffle原理有助于优化Spark应用程序。

1.2 Spark Shuffle的基本原理与特性

与MapReduce 中的Shuffle类似

  • 在DAG阶段以shuffle为界,划分Stage,上游为 map task,下游为reduce task
  • 每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write
  • 每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read

在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作,所以整个shuffle过程是极其昂贵的

1.4 Spark Shuffle实现演进

1.4.1 Hash Shuffle v1

和上述流程类似,

假如一个executor上运行 M 个map task,下游reduce 有 N 个分区,则executor 会生成M*N个临时文件,生成文件时需要申请文件描述符,当partition很多时,并行化运行task时可能会耗尽文件描述符、消耗大量内存。因此后来Hash Shuffle进一步变成了如下版本。

1.4.2 Sort Shuffle

  • 在map阶段,会先按照partition id、每个partition内部按照key对中间结果进行排序。所有的partition数据写在一个文件里,并且通过一个索引文件记录每个partition的大小和偏移量。这样并行运行时每个core只要2个文件,一个executor上最多2m个文件。。
  • 在reduce阶段,reduce task拉取数据做combine时不再使用HashMap而是ExternalAppendOnlyMap。如果内存不足会写次磁盘。但是排序会导致性能损失。

1.4.3 Unsafe Shuffle

从spark 1.5.0开始,spark开始了钨丝计划(Tungsten),目的是优化内存和CPU的使用,进一步提升spark的性能。为此,引入Unsafe Shuffle,它的做法是将数据记录用二进制的方式存储,直接在序列化的二进制数据上sort而不是在java 对象上,这样一方面可以减少memory的使用和GC的开销,另一方面避免shuffle过程中频繁的序列化以及反序列化。在排序过程中,它提供cache-efficient sorter,使用一个8 bytes的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。

但是使用Unsafe Shuffle有几个限制,shuffle阶段不能有aggregate操作,分区数不能超过一定大小( 2^{24} -1,这是可编码的最大parition id),所以像reduceByKey这类有aggregate操作的算子是不能使用Unsafe Shuffle,它会退化采用Sort Shuffle。

从spark-1.6.0开始,把Sort Shuffle和Unsafe Shuffle全部统一到Sort Shuffle中,如果检测到满足Unsafe Shuffle条件会自动采用Unsafe Shuffle,否则采用Sort Shuffle。从spark-2.0.0开始,spark把Hash Shuffle移除,可以说目前spark-2.0中只有一种Shuffle,即为Sort Shuffle。

2. 宽依赖&&窄依赖

2.1 RDD Lineages

RDD也是一个DAG的任务集合,一个DAG代表了一个RDD的计算过程。每个DAG都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark中叫做Lineages。

2.2 宽依赖&&窄依赖

  • 窄依赖:父分区对应一个子分区。对于窄依赖,只需通过重新计算丢失的那一块数据来恢复,容错成本较小。
  • 宽依赖:分区对应多个子分区 。对于宽依赖,会对父分区进行重新计算,造成冗余计算。

B ->G 中的join是窄依赖,因为之前的groupby已经将B中的数据通过shuffle进行了分区 所以join操作已有窄依赖已有宽依赖

如何判断是宽依赖还是窄依赖

每个RDD对象都有一个dependencies,通过获取这个属性,可以判断他是宽依赖还是窄依赖

宽依赖:

  • ShuffleDependency

窄依赖:

  • OneToOneDependency
  • PruneDependency
  • RangeDependency

也可以通过 toDebugString 属性,查看整个RDD Lineages

2.3 RDD容错

当出现数据丢失时,会通过RDD之间的血缘关系(Lineages)进行重新计算,但是如果错误发生在一个复杂的宽依赖的时候,重新计算任然会消耗掉很多资源。

2.4 缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

如图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。

当然缓存也有缓存到内存或者是硬盘上,默认情况下是缓存到内存

更多的缓存方式请点这里

3. 共享变量

在Spark执行时,每个task之前无法进行数据交换的,但是有时却需要统计一些公共的值,譬如计数之类的,该怎么告呢? 这时候就要用到Spark 中的共享变量了。Spark中一共有两个共享变量:Broadcast Variables、Accumulators

  • Broadcast Variables 广播变量是一个只读变量,存放后,在集群中任何节点都可以访问
  • Accumulators 累加器,功能和名字差不多,可以在并行的情况下高效的进行累加

参考

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark入门

    Transformation:进行数据的转换,即将一个RDD转换成另一个RDD,这类转换并不触发提交作业,完成作业中间过程处理。

    俺也想起舞
  • HBase设计结构和原理

    既然是Write-Ahead-Log,为何先写内存再写WAL? 先写内存的原因:HBase提供了一个MVCC机制,来保障些数据阶段的数据可见性。先写Mem...

    俺也想起舞
  • Hive基础操作

    俺也想起舞
  • 3.0Spark计算模型

    Spark大数据处理:技术、应用与性能优化 第3章 Spark计算模型 创新都是站在巨人的肩膀上产生的,在大数据领域也不例外。微软的Dryad使用DAG执行模...

    Albert陈凯
  • Spark系列课程-0020Spark RDD图例讲解

    我们从这节课开始,讲Spark的内核,英文叫做Spark Core,在讲Spark Core之前我们先讲一个重要的概念,RDD, ? image.png 我们S...

    Albert陈凯
  • 开源中文关系抽取框架,来自浙大知识引擎实验室

    机器学习AI算法工程
  • win10 UWP 发邮件

    UWP 下如何发邮件?可以使用mailto:xx?subject=*方式发送? 本文:如何在 UWP 使用默认邮件发邮件。

    林德熙
  • python virtualenv开发环

    py3study
  • MINIEYE发布成果:进入前装市场,与比亚迪等多款合作车型年内将上市 | 活动

    镁客网
  • SCIP学习笔记

    刘笑江

扫码关注云+社区

领取腾讯云代金券