Spark Shuffle 模块② - Hash Based Shuffle write

Spark 2.0 中已经移除 Hash Based Shuffle,但作为曾经的默认 Shuffle 机制,还是值得进行分析

Spark 最开始只有 Hash Based Shuffle,因为在很多场景中并不需要排序,在这些场景中多余的排序反而会损耗性能。

Hash Based Shuffle Write

该过程实现的核心是在 HashShuffleWriter#write(records: Iterator[Product2[K, V]]): Unit 其主要流程如下:

该函数的输入是一个 Shuffle Map Task 计算得到的结果(对应的迭代器),若在宽依赖中定义了 map 端的聚合则会先进行聚合,随后对于迭代器(若要聚合则为聚合后的迭代器)的每一项先通过计算 key 的 hash 值来确定要写到哪个文件,然后将 key、value 写入文件。

写入的文件名的格式是:shuffle_$shuffleId_$mapId_$reduceId。写入时,若文件已存在会删除会创建新文件。

上图描述了如何处理一个 Shuffle Map Task 计算结果,在实际应用中,往往有很多 Shuffle Map Tasks 及下游 tasks,即如下情况(图摘自:JerryLead/SparkInternals-Shuffle 过程):

存在的问题

这种简单的实现会有几个问题,为说明方便,这里设 M = Shuffle Map Task 数量R = 下游 tasks 数量

  • 产生过多文件:由于每个 Shuffle Map Task 需要为每个下游的 Task 创建一个单独的文件,因此文件的数量就是 M * R。如果 Shuffle Map Tasks 数量是 1000,下游的 tasks 数是 800,那么理论上会产生 80w 个文件(对于 size 为 0的文件会特殊处理)
  • 打开多个文件对于系统来说意味着随机写,尤其是每个文件较小且文件特别多的情况。机械硬盘在随机读写方面的性能很差,如果是固态硬盘,会改善很多
  • 缓冲区占用内存空间大:每个 Shuffle Map Task 需要开 R 个 bucket(为减少写文件次数的缓冲区),N 个 Shuffle Map Task 就会产生 N * R 个 bucket。虽然一个 Shuffle Map Task,对应的 buckets 会被回收,但一个节点上的 bucket 个数最多可以达到 cores * R 个,每个 bucket 默认为 32KB。对于 24 核 1000 个 reducer 来说,占用内存就是 750MB

改进:Shuffle Consolidate Writer

在上面提到的几个问题,Spark 提供了 Shuffle Consolidate Files 机制进行优化。该机制的手段是减少 Shuffle 过程产生的文件,若使用这个功能,则需要置 spark.shuffle.consolidateFilestrue,其实现可用下图来表示(图摘自:JerryLead/SparkInternals-Shuffle 过程

即:对于运行在同一个 core 的 Shuffle Map Tasks,对于将要被同一个 reducer read 的数据,第一个 Shuffle Map Task 会创建一个文件,之后的就会将数据追加到这个文件而不是新建一个文件(相当于同一个 core 上的 Shuffle Map Task 写了文件不同的部分)。因此文件数就从原来的 M * R 个变成了 cores * R 个。当 M / cores 的值越大,减少文件数的效果越显著。需要注意的是,该机制虽然在很多时候能缓解上述的几个问题,但是并不能彻底解决。

参考


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

Spark系列课程-00xxSpark RDD持久化

我们这节课讲一下RDD的持久化 ? RDD的持久化 这段代码我们上午已经看过了,有瑕疵大家看出来了吗? 有什么瑕疵啊? 大家是否还记得我在第二节课的时候跟大...

41580
来自专栏大数据-Hadoop、Spark

Spark Streaming + Kafka整合

28550
来自专栏Albert陈凯

4.3 RDD操作

4.3 RDD操作 RDD提供了一个抽象的分布式数据架构,我们不必担心底层数据的分布式特性,而应用逻辑可以表达为一系列转换处理。 通常应用逻辑是以一系列转换(...

28470
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

502120
来自专栏Albert陈凯

3.4 RDD的计算

3.4 RDD的计算 3.4.1 Ta s k简介 原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为...

372100
来自专栏Jed的技术阶梯

Spark性能调优04-数据倾斜调优

数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。...

67650
来自专栏Albert陈凯

2018-11-07 Spark应用程序开发参数调优深入剖析-Spark商业调优实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归作者(秦凯新)所有...

12740
来自专栏祝威廉

Spark Streaming 不同Batch任务可以并行计算么?

其实Stage,Task都是Spark Core里就有的概念,Job 在Streaming和Spark Core里的概念则是不一致的。Batch则是Stream...

17030
来自专栏木东居士的专栏

用MPI进行分布式内存编程(入门篇)

47730
来自专栏个人分享

Shuffle相关分析

 Shuffle描述是一个过程,表现出的是多对多的依赖关系。Shuffle是连接map阶段和Reduce阶段的纽带,每个Reduce Task都会从Map Ta...

11040

扫码关注云+社区

领取腾讯云代金券