前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Shuffle调优指南

Spark Shuffle调优指南

原创
作者头像
Chrisbobo
发布2021-02-20 14:44:42
1.3K0
发布2021-02-20 14:44:42
举报
文章被收录于专栏:Spark专栏Spark专栏

概述

从Spark shuffle原理可知,Spark shuffle在计算与IO方面,都可能有较大开销,故,Spark shuffle调优就是优化这2个方面。 这里仅关注调参的调优方式,不关注应用代码层面的调优。

计算方面

计算方面可分为为Cpu、内存、算法这3个方面,其中Cpu一般是不用纳入shuffle优化范畴的,内存方面主要关注内存比例与堆外内存,算法指的是采用什么类型的ShuffleManager。

1、内存

spark.shuffle.io.preferDirectBufs 默认为true,即启用堆外内存,可以减少shuffle与block传输期间的gc,若堆外内存非常紧张,则可以考虑关闭这个选项 spark.yarn.executor.memoryoverhead 默认申请的堆外内存是Executor内存的10%,处理大数据的时候,这里会出现问题,导致spark作业反复崩溃,无法运行,故手动设置这个参数,到至少1G(1024M),甚至2G、4G spark.shuffle.memoryFraction 此参数属于spark静态内存模型中的参数,统一内存模型中不再生效 spark.memory.fraction 默认是0.6,即Spark存储内存与执行内存总共占用Executor内存的60%,一般不建议修改 spark.memory.storageFraction 默认是0.5,即Spark存储内存与执行内存各占spark.memory.fraction的50%,一般不建议修改

2、Shuffle算法

spark.shuffle.manager 默认是sort,即SortShuffleManager,hash则在spark2.0之后被废弃并移除 spark.shuffle.sort.bypassMergeThreshold 默认是200,即若shuffle read task的数量小于等于200,则在sortshufflemanager模式下,会启用bypass SortShuffleManager。 若为spark2.0以下版本,这里的调优建议是bypass SortShuffleManager的性能有时并不如开启“合并运行机制”的HashShuffleManager,故当shuffle read task的数量小于等于200时,可尝试“合并运行机制”的HashShuffleManager,与bypass SortShuffleManager进行性能对比。在实践中发现“合并运行机制”的HashShuffleManager的性能比bypass SortShuffleManager要高出10%~30% spark.shuffle.consolidateFiles 默认为false,即不使用“合并运行机制”的HashShuffleManager,该参数在spark2.0之后被废弃并移除

IO方面

1、缓存

spark.shuffle.file.buffer 默认值:32k 参数说明:用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。默认使用这么小的缓存,是希望在硬件较小的情况下也可以部署。 调优建议:若作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight 默认值:48m 参数说明:用于设置shuffle read task的buffer缓冲大小,这个buffer缓冲决定了每次能够拉取多少数据。 调优建议:若作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

2、压缩

spark.shuffle.compress 默认值:true 参数说明:判断是否对mapper端聚合输出进行压缩,当设置为true,表示在每个shuffle过程中都会对mapper端的输出进行压缩,减少shuffle过程中下一个stage向上一个stage抓取数据的网络开销,减轻shuffle的压力。 调优建议:压缩会消耗大量的CPU资源,故打开压缩选项会增加Map任务的执行时间,因此当CPU负载的影响远大于磁盘和网络带宽的影响时,可设置为false。 spark.io.compression.codec 默认值:spark2.2之前默认压缩方式snappy,spark2.2之后默认是lz4 参数说明:压缩内部数据,如RDD分区,广播变量和shuffle输出的数据,该参数的值有三个选项,分别是snappy,lz4和lzf。 调优建议:无。 spark.shuffle.spill.compress 默认值:true 参数说明:在shuffle过程中,是否压缩spill的数据 调优建议:压缩会消耗大量的CPU资源,故打开压缩选项会增加Map任务的执行时间,因此当CPU负载的影响远大于磁盘和网络带宽的影响时,可设置为false。

3、网络

spark.shuffle.io.maxRetries 默认值:3 参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。 调优建议:通常建议调节到8~10次,对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败,调节该参数可以大幅度提升稳定性。 spark.shuffle.io.retryWait 默认值:5s 参数说明:每次重试拉取数据的等待间隔 调优建议:通常建议加大时长,理由同上。 spark.shuffle.service.enabled 默认值:false 参数说明:设置客户端读取Executor上的shuffle文件的方式,默认值是false,表示使用BlockTransferService读取;设置为true,表示BlockManager实例生成时,需要读取spark.shuffle.service.port配置的端口,同时对应的BlockManager的shuffleclient不再是默认的BlockTransferService实例,而是ExternalShuffleClient实例。启用外部shuffle服务,这个服务会安全地保存shuffle过程中,executor写的磁盘文件,因此executor即使挂掉也不要紧,必须配合spark.dynamicAllocation.enabled属性设置为true,才能生效,而且外部shuffle服务必须进行安装和启动,才能启用这个属性。 调优建议:Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。 在NodeManager中启动External shuffle Service。在“yarn-site.xml”中添加如下配置项: <property>       <name>yarn.nodemanager.aux-services</name>                     <value>spark_shuffle</value>   </property> <property>     <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>     <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property>     <name>spark.shuffle.service.port</name>     <value>7337</value> </property>

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 计算方面
    • 1、内存
      • 2、Shuffle算法
      • IO方面
        • 1、缓存
          • 2、压缩
            • 3、网络
            相关产品与服务
            文件存储
            文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档