前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【专题】spark/MR 数据倾斜优化

【专题】spark/MR 数据倾斜优化

原创
作者头像
艾利
修改2022-08-30 21:28:42
1.8K0
修改2022-08-30 21:28:42
举报
文章被收录于专栏:数仓建模

一、什么是数据倾斜?

原理:在进行shuffle的时候,须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

表象:

MapReduce任务:主要表现在ruduce阶段卡在99.99%,一直99.99%不能结束,各种container报错OOM

Spark任务:单个Executor执行时间特别久,整体任务卡在某个stage不能结束,Executor lost,OOM,Shuffle过程出错。

二、业内数据倾斜的判断标准?

从执行时间倾斜度和数据倾斜度来观测:(比如执行时间倾斜度、数据量倾斜度均大于 2)

执行时间倾斜度定义为:所有并行节点执行时长的最大值 (Max) 与中位数 (Median) 的比值;(举例:执行时间倾斜 = 5.7mins(最大) - 12s(中位数) )

数据量倾斜度定义为:所有并行节点所分配的数据量的最大值 (Max) 与中位数 (Median) 的比值;(数据量倾斜

= 38MB(最大) - 204.8KB(中位数) )

三、如何解决倾斜?

常见处理方法汇总:

3.1 输入倾斜

方案实现原理:

在读orc表时,spark任务在创建map task时默认使用BI策略,BI策略是以文件为粒度进行split划分;ETL策略会将文件进行切分,多个stripe组成一个split;HYBRID策略为:当文件的平均大小大于hadoop最大split值(默认256M)时使用ETL策略,否则使用BI策略。

解决方案:

指定使用ETL策略:

spark.hadoop.hive.exec.orc.split.strategy=ETL;(该参数只对orc格式生效)

合并小文件:

spark.sql.mergeSmallFileSize=10485760(10M),有效减少map输入端倾斜

spark.hadoopRDD.targetBytesInPartition=67108864; (平台设置为:1M) 合并文件大小为64M

方案优缺点:

当ETL策略生效时,driver读取file footer等信息,若其footer(用于描述整个文件的基本信息、表结构信息、行数、各个字段的统计信息以及各个Stripe的信息)较大,可能会导致driver端OOM,因此这类表的读取建议使用BI策略。对于一些较小的尤其有数据倾斜的表(这里的数据倾斜指大量stripe存储于少数文件中),建议使用ETL策略。

3.2 shuffle倾斜

3.2.1、key倾斜程度轻微

方案实现原理:

增加shuffle read task的数量,让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。

解决方案:

spark.sql.shuffle.partitions = 4000 (默认500)

方案优缺点:

实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。只是缓解了数据倾斜而已,没有彻底根除问题,其效果有限。

3.2.2、少数key倾斜严重

方案实现原理:

将导致数据倾斜的少数key过滤之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

解决方案:

在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。

代码块

代码语言:javascript
复制
where key is not in('bigkey')

方案优缺点:

实现简单,而且效果也很好,可以完全规避掉数据倾斜。适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

3.2.3、reducebykey等聚合类shuffle算子

方案实现原理:

将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。

解决方案:

将group by 产生的倾斜key 通过附加随机前缀的方式,进行聚合。

代码语言:sql
复制
select split(t.activity_id,'_')[1]
from
   (select concat(cast(ceiling(rand(1)*10000) as int),'_',activity_id) activity_id -- 将activity_id打散10000倍
      from table1
     group by 1
   ) t --1阶段聚合
group by split(t.activity_id,'_')[1];   --2阶段聚合

方案优缺点:

对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,仅仅适用于聚合类的shuffle操作,适用范围相对较窄。

3.2.4、join类导致的key倾斜

3.2.4.1 维表小,将reduce join 变为map join

方案实现原理:

普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小表+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜

解决方案:将小表进行广播

代码语言:javascript
复制
set hive.auto.convert.join = true; -- hive是否自动根据文件量大小,选择将common join转成map join 
set hive.mapjoin.smalltable.filesize =25000000; 
-- 大表小表判断的阈值,如果表的大小小于该值25Mb,则会被判定为小表。则会被加载到内存中运行,将commonjoin转化成mapjoin。一般这个值也就最多几百兆的样子。
代码语言:sql
复制
 select /*+ MAPJOIN(b) */
    a.poi_id
  from table a join b  

方案优缺点:

对join操作导致的数据倾斜,效果非常好,这个方案只适用于一个大表和一个小表join的情况。

一般集群开启map join会自动进行广播,对于表是否被广播,需要读取表元数据信息。分区表在matestore里基本都是没有元数据的,取不到的话就走默认值了(取int最大值),临时表在matestore也没有存储表信息。因此对于分区表或者临时表,需要手动指定map join。

3.3.4.2 部分key倾斜严重,采样倾斜key,并拆分join

方案实现原理:

对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。

解决方案:

将少数倾斜key取出来,并将对应的维表扩容n倍,非倾斜的key正常join

方案优缺点:

对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散倾斜的key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

3.2.4.3 大量key倾斜严重,采样随机前缀和扩容RDD

方案实现原理:

将原先相同的key通过附加随机前缀变成不同的key,然后就可以将这些处理后的“不同的key”分散到多个task中去处理,而不是让一个task处理大量的相同key。而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD维表进行数据扩容,对内存资源要求很高。

解决方案:

将倾斜key对应的b表进行扩容n倍

方案优缺点:

对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。维表会膨胀n倍。运行时间会有影响,可能会变多。

3.3.4.4 大量key倾斜严重,动态一分为二

方案实现原理:

对于倾斜的值和非倾斜的值分开处理,最后union all

解决方案:

需要临时表存放倾斜的键,将表倾斜值扩容n倍,对于倾斜的key的维表进行mapjoin(广播)或者关联,非倾斜的key正常join

代码语言:sql
复制
-- 临时表存放买家超过50000的大卖家
INSERT OVERWRITE TABLE temp_b
SELECT  t1.seller_id,t2.seller_level
FROM ( SELECT seller_id,count(buyer_id) as buyer_cnt
       FROM table_a                     --- 有购买行为
       GROUP BY seller_id
      having count(buyer_id)>50000
       ) t1
inner JOIN 
  ( SELECT seller_id,seller_level
     FROM table_b
   ) t2
ON t1.seller_id = t2.seller_id

-- 对于倾斜的值和非倾斜的值分开处理,最后union all
SELECT buyer_id seller_level,sum(order_num ) as order_num
FROM (
  SELECT
	 /*+ BROADCAST(t2) */
	 t1.buyer_id,t1.seller_level,sum(order_num)
  FROM table_a t1
  LEFT JOIN temp_b t2
  ON t1.seller_id = t2.seller_id
  group by 1,2
  UNION ALL  --针对大卖家map join 其他卖家正常join
  SELECT t1.buyer_id,t4.seller_level,sum(order_num) order_num
  FROM table_a t1
  LEFT JOIN 
	(
	SELECT seller_id,seller_level
	FROM table_b t2
	LEFT JOIN temp_b t3
	ON t2.seller_id = t3.seller_id
	WHERE t3.seller_id is null
	) t4
  ON t1.seller_id = t4.seller_id
  ) t
GROUP BY 1,2;

方案优缺点:

比较通用,自由度高,但是对于代码的更改最大,更改代码框架。

3.3 膨胀倾斜

方案实现原理:

在数据处理中有一种特殊的情况,两个多对多关系的表进行join,会发生数据膨胀

解决方案:

在数据处理中应该尽可能的避免笛卡尔积,以及热点key多对多关系。如果业务上确实需要多对多关系,可以从这几点考虑优化

  • 能否去掉一些热点的大key
  • 能否增加一些关联条件,减少最终的结果数据
  • 能否在数据范围上做减少,对于笛卡尔积的关联需要把数据条数控制在1亿以内
  • 如果是M*N(M>>N)的多对多关系,可以考虑把小表N广播出去,对于大表M切分成多个很小的数据分片,进行mapjoin

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、什么是数据倾斜?
  • 二、业内数据倾斜的判断标准?
  • 三、如何解决倾斜?
    • 3.1 输入倾斜
      • 3.2 shuffle倾斜
        • 3.2.1、key倾斜程度轻微
        • 3.2.2、少数key倾斜严重
        • 3.2.3、reducebykey等聚合类shuffle算子
        • 3.2.4、join类导致的key倾斜
      • 3.3 膨胀倾斜
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档