前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark内部原理

Spark内部原理

作者头像
俺也想起舞
发布2019-07-24 14:36:37
7300
发布2019-07-24 14:36:37
举报

Spark中的Shuffle、宽依赖窄依赖、RDD持久化、共享变量

1.Shuffle

1.1 什么是Shuffle

Spark是分布式计算系统,数据块在不同节点执行,但是一些操作,例如join,需要将不同节点上相同的Key对应的Value聚集到一起,Shuffle便应运而生。

Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,这期间涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等,所以说Shuffle是整个应用程序运行过程中非常昂贵的一个阶段,理解Spark Shuffle原理有助于优化Spark应用程序。

1.2 Spark Shuffle的基本原理与特性

与MapReduce 中的Shuffle类似

  • 在DAG阶段以shuffle为界,划分Stage,上游为 map task,下游为reduce task
  • 每个map task将计算结果数据分成多份,每一份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程叫做shuffle write
  • 每个reduce task通过网络拉取上游stage中所有map task的指定分区结果数据,该过程叫做shuffle read

在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及到序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有前面shuffle read过程,这个过程涉及到网络IO、反序列化等耗时操作,所以整个shuffle过程是极其昂贵的

1.4 Spark Shuffle实现演进
1.4.1 Hash Shuffle v1

和上述流程类似,

假如一个executor上运行 M 个map task,下游reduce 有 N 个分区,则executor 会生成M*N个临时文件,生成文件时需要申请文件描述符,当partition很多时,并行化运行task时可能会耗尽文件描述符、消耗大量内存。因此后来Hash Shuffle进一步变成了如下版本。

1.4.2 Sort Shuffle
  • 在map阶段,会先按照partition id、每个partition内部按照key对中间结果进行排序。所有的partition数据写在一个文件里,并且通过一个索引文件记录每个partition的大小和偏移量。这样并行运行时每个core只要2个文件,一个executor上最多2m个文件。。
  • 在reduce阶段,reduce task拉取数据做combine时不再使用HashMap而是ExternalAppendOnlyMap。如果内存不足会写次磁盘。但是排序会导致性能损失。
1.4.3 Unsafe Shuffle

从spark 1.5.0开始,spark开始了钨丝计划(Tungsten),目的是优化内存和CPU的使用,进一步提升spark的性能。为此,引入Unsafe Shuffle,它的做法是将数据记录用二进制的方式存储,直接在序列化的二进制数据上sort而不是在java 对象上,这样一方面可以减少memory的使用和GC的开销,另一方面避免shuffle过程中频繁的序列化以及反序列化。在排序过程中,它提供cache-efficient sorter,使用一个8 bytes的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。

但是使用Unsafe Shuffle有几个限制,shuffle阶段不能有aggregate操作,分区数不能超过一定大小( 2^{24} -1,这是可编码的最大parition id),所以像reduceByKey这类有aggregate操作的算子是不能使用Unsafe Shuffle,它会退化采用Sort Shuffle。

从spark-1.6.0开始,把Sort Shuffle和Unsafe Shuffle全部统一到Sort Shuffle中,如果检测到满足Unsafe Shuffle条件会自动采用Unsafe Shuffle,否则采用Sort Shuffle。从spark-2.0.0开始,spark把Hash Shuffle移除,可以说目前spark-2.0中只有一种Shuffle,即为Sort Shuffle。

2. 宽依赖&&窄依赖

2.1 RDD Lineages

RDD也是一个DAG的任务集合,一个DAG代表了一个RDD的计算过程。每个DAG都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark中叫做Lineages。

2.2 宽依赖&&窄依赖
  • 窄依赖:父分区对应一个子分区。对于窄依赖,只需通过重新计算丢失的那一块数据来恢复,容错成本较小。
  • 宽依赖:分区对应多个子分区 。对于宽依赖,会对父分区进行重新计算,造成冗余计算。

B ->G 中的join是窄依赖,因为之前的groupby已经将B中的数据通过shuffle进行了分区 所以join操作已有窄依赖已有宽依赖

如何判断是宽依赖还是窄依赖

每个RDD对象都有一个dependencies,通过获取这个属性,可以判断他是宽依赖还是窄依赖

宽依赖:

  • ShuffleDependency

窄依赖:

  • OneToOneDependency
  • PruneDependency
  • RangeDependency

也可以通过 toDebugString 属性,查看整个RDD Lineages

2.3 RDD容错

当出现数据丢失时,会通过RDD之间的血缘关系(Lineages)进行重新计算,但是如果错误发生在一个复杂的宽依赖的时候,重新计算任然会消耗掉很多资源。

2.4 缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

如图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。

当然缓存也有缓存到内存或者是硬盘上,默认情况下是缓存到内存

更多的缓存方式请点这里

3. 共享变量

在Spark执行时,每个task之前无法进行数据交换的,但是有时却需要统计一些公共的值,譬如计数之类的,该怎么告呢? 这时候就要用到Spark 中的共享变量了。Spark中一共有两个共享变量:Broadcast Variables、Accumulators

  • Broadcast Variables 广播变量是一个只读变量,存放后,在集群中任何节点都可以访问
  • Accumulators 累加器,功能和名字差不多,可以在并行的情况下高效的进行累加

参考

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.Shuffle
    • 1.1 什么是Shuffle
      • 1.2 Spark Shuffle的基本原理与特性
        • 1.4 Spark Shuffle实现演进
          • 1.4.1 Hash Shuffle v1
            • 1.4.2 Sort Shuffle
              • 1.4.3 Unsafe Shuffle
              • 2. 宽依赖&&窄依赖
                • 2.1 RDD Lineages
                  • 2.2 宽依赖&&窄依赖
                    • 如何判断是宽依赖还是窄依赖
                  • 2.3 RDD容错
                    • 2.4 缓存
                    • 3. 共享变量
                    相关产品与服务
                    文件存储
                    文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档