前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark的Shuffle原理及调优

Spark的Shuffle原理及调优

作者头像
挽风
发布2022-09-23 09:53:18
4320
发布2022-09-23 09:53:18
举报
文章被收录于专栏:小道小道

一、Shuffle原理

  当使⽤reduceByKey、groupByKey、sortByKey、countByKey、join、cogroup等操作的时候,会发⽣shuffle操作。Spark在DAG调度阶段将job划分成多个stage,上游stage做map操作,下游stage做reduce操作,其本质还是MR计算架 构。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce的输⼊,这期间涉及到序列化和反序列化、跨节点⽹络IO和磁盘读写IO等,所以说shuffle是整个应⽤过程特别昂贵的阶段。

  与MapReduce计算框架⼀样,spark的shuffle实现⼤致如下图所⽰,在DAG阶段以shuffle为界,划分stage,上游stage 做map task,每个map task将计算结果数据分成多份,每⼀份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程就叫做shuffle write;下游stage叫做reduce task,每个reduce task通过⽹络拉取指定分区结果数据,该过程叫做shuffle read,最后完成reduce的业务逻辑。

  举例:上游stage有100个map task,下游有1000个reduce task,那么这100个map task中每个maptask都会得到1000份数据,⽽这1000个reduce task中的每个reduce task都会拉取上游100个map task对应的那份数据,即第⼀个reduce task会拉取所有map task结果数据的第⼀份,以此类推。

在这里插入图片描述
在这里插入图片描述

  在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及序列化、磁盘IO等耗时操作;在reduce阶段,除了reduce的业务逻辑外,还有shuffle read过程,这个过程涉及到⽹络IO、反序列化等耗时操作。所以整个shuffle过程是极其昂贵的。

  因为shuffle是⼀个涉及CPU(序列化反序列化)、⽹络IO(跨节点数据传输)以及磁盘IO(shuffle中间结果落地)的操作,所以应当考虑shuffle相关的调优,提升spark应⽤程序的性能。

二、Shuffle调优

2.1 程序调优;

  ⾸先,尽量减少shuffle次数;

代码语言:javascript
复制
//两次shuffle
rdd.map().repartition(1000).reduceByKey(_+_,3000)
//⼀次shuffle
Rdd.map().repartition(3000).reduceByKey(_+_)

  然后必要时主动shuffle,通常⽤于改变并⾏度,提⾼后续分布式运⾏速度;

代码语言:javascript
复制
rdd.repartition(largerNumPartition).map()

  最后,使⽤treeReduce&treeAggregate替换reduce&aggregate。数据量较⼤时,reduce&aggregate⼀次性聚 合,shuffle量太⼤,⽽treeReduce&treeAggregate是分批聚合,更为保险。

2.2 参数调优;

spark.shuffle.file.buffer : map task到buffer到磁盘   默认值:32K

  参数说明:该参数⽤于设置shuffle write task的BufferedOutputStream的buffer缓冲⼤⼩。将数据写到磁盘⽂件之前,会先写⼊buffer缓冲中,待缓冲写满之后,才会溢写到磁盘;

  调优建议:如果作业可⽤的内存资源较为充⾜的话,可以适当增加这个参数的⼤⼩(⽐如64k),从⽽减少shufflewrite过程中溢写磁盘⽂件的次数,也就可以减少磁盘IO次数,进⽽提升性能。在实践中发现,合理调节该参数,性能会有1到5%的提升。

spark.reducer.maxSizeFlight:reduce task去磁盘拉取数据   默认值:48m

  参数说明:该参数⽤于设置shuffle read task的buffer缓冲⼤⼩,⽽这个buffer缓冲决定了每次能够拉取多少数据。调优建议:如果作业可⽤的内存资源较为充⾜的话,可以增加这个参数的⼤⼩(⽐如96M),从⽽减少拉取数据的次数,也就可以减少⽹络传输的次数,进⽽提升性能。在实践中发现,合理调节该参数,性能会有1到5%的提升。

Spark.shuffle.io.maxRetries   默认值:3

  参数说明:shuffle read task从shuffle write task所在节点拉取属于⾃⼰的数据时,如果因为⽹络异常导致拉取失败,时会⾃动进⾏重试的。该参数就代表了可以重试的最⼤次数,如果在指定次数内拉取属于还是没有成功,就可能会导致作业执⾏失败。

  调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最⼤次数(⽐如6次),可以避免由于JVM的full gc或者⽹络不稳定等因素导致的数据拉取失败。在实践中发现,对于超⼤数据量(数⼗亿到上百亿)的shuffle过程,调节该参数可以⼤幅度提升稳定性。

Spark.shuffle.io.retryWait   默认值:5s

  参数说明:shuffle read task从shuffle write task所在节点拉取属于⾃⼰的数据时,如果拉取失败了每次重试拉取数据的等待时间间隔,默认是5s;

  调优建议:建议加⼤时间间隔时长,⽐如60s,以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction   默认值:0.2

  参数说明:该参数代表了executor内存中,分配给shuffle read task进⾏聚合操作的内存⽐例,默认是20%;

  调优建议:如果内存充⾜,⽽且很少使⽤持久化操作,建议调⾼和这个⽐例,给shuffle read的聚合操作更多内存,以避免由于内存不⾜导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%。

Spark.shuffle.manager   默认值:sort

  参数说明:该参数⽤于设置shuffleManager的类型。Spark1.5以后有三个可选项:hash、sort和tungsten- sort。Tungsten-sort与sort类似,但是使⽤了tungsten计划中的堆外内存管理机制,内存使⽤效率提⾼。

  调优建议:由于sort shuffleManager默认会对数据进⾏排序,因此如果你的业务逻辑中需要该排序机制的话,则使⽤默认的sort ShuffleManager就可以;但是如果你的业务逻辑不需要对数据进⾏排序,那么建议参考后⾯的⼏个参数调优,通过bypass机制或优化的hash ShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这⾥要注意的是,tungsten-sort要慎⽤,因为之前发现了⼀些相应的bug。

Spark.shuffle.sort.bypassMergeThreshold   默认值:200

  参数说明:当shuffleManager为sortshuffleManager时,如果shuffle read task的数量⼩于这个阈值,则shuffle write过程中不会进⾏排序操作,⽽是直接按照未经优化的hashShuffleManager的⽅式去写数据,但是最后会将每个task产⽣的所有临时磁盘⽂件都合并成⼀个⽂件,并会创建单独的索引⽂件。

  调优建议:当你使⽤sortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调⼤⼀些,⼤于shuffleread task的数量,那么此时就会⾃动启⽤bupass机制,map-side就不会进⾏排序,减少了排序的性能开销。但是这种⽅式下,依然会产⽣⼤量的磁盘⽂件,因此shuffle write性能有待提⾼。

Spark.shuffle.consolidateFiles   默认值:false

  参数说明:如果使⽤hashShuffleManager,该参数有效。如果设置为true,那么就会开启consilidate机制,会⼤幅度合并shuflle write的输出⽂件,对于shuffle read task数量特别多的情况下,这种⽅法可以极⼤地减少磁盘IO开销,提升性能。

  调优建议:如果的确不需要sortHashShuffle的排序机制,那么除了使⽤bypass机制,还可以尝试 将spark.shuffle.manager参数⼿动调节为hash,使⽤hashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能⽐开启了bypass机制的sortshuffleManager要⾼出10%到30%。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-08-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Shuffle原理
  • 二、Shuffle调优
    • 2.1 程序调优;
      • 2.2 参数调优;
      相关产品与服务
      文件存储
      文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档