Spark之数据倾斜

1、数据倾斜概述

在大数据计算领域,数据倾斜是最为令人恼火的问题之一,一旦出现,不仅会极大地影响整个作业的执行效率,甚至还可能导致作业执行失败。

数据倾斜出现原因

数据倾斜一般是在执行shuffle操作的过程中出现,它会根据key来调度数据,相同key对应的values数据会被拉取到同一个reducetask中进行聚合操作并输出。举个例子:假如当前有多条key-value数据,一共100万条,某个key对应了98万条数据,这98万条数据将被分配到某一个reduce task上去执行;另外还有两个reduce task,可能的情况是各分配到了1万条数据,可能包含了数十个key对应的数据。

在这种情况下,就会出现严重的数据倾斜问题,不难想象,Spark在这种情况下的运行效率会有多么糟糕。其中两个reduce task各分配到了1万条数据,可能它们在5分钟内就运行完了,但另外一个reduce task有98万条数据,可能就需要运行98/1*5=490分钟=8.2小时。其它reduce task很快就运行完了,但是由于有一个拖后腿的家伙,导致整个Spark作业需要8个多小时才能执行完毕,这让人是难以接受的。

数据倾斜出现现象

在Spark作业中出现数据倾斜,一般有两种表现:

A、大部分的task都执行的特别快,可能在几秒钟就执行完一个(通过client模式提交的Spark作业,如standlone client或yarn client,本地机器都能看到相应的log),而剩下的某几个task执行地特别慢,可能一两个小时后还没有执行完成。出现上述情况,就表明发生了数据倾斜,这种情况还不是最糟糕的,至少在等待较长时间后,Spark作业还是能够执行完成。

B、另一种情况是,在Spark作业执行的时候,其他task都运行完了,也没有出现什么问题,但是有个别task突然间报出OOM(JVMOut Of Memory)的异常,内存溢出了,导致task failed、task lost、resubmit task等,反复执行几次Spark作业都遇到同样的问题,最后Spark作业就执行失败退出了。

某个task报OOM异常,基本也可以确定是数据倾斜导致的问题,该task分配的数据量太大,占用了过多内存,然后task在执行过程中还需要创建大量的Java对象,最后导致内存爆掉。出现这种问题,是无法忍受的,如果不去正面解决数据倾斜的问题,Spark作业压根就不能正常跑过。

数据倾斜出现位置

在Spark作业中出现数据倾斜,分析定位出现问题的位置时,一般要先根据输出的log来判断在哪个stage出现了数据倾斜,然后再结合Spark作业stage划分原理,看Spark程序在哪些地方使用了会产生shuffle操作的算子,如groupByKey、countByKey、reduceByKey以及join等,最终就能够准确定位到出现数据倾斜的代码位置。

上面简单介绍了Spark作业中数据倾斜出现的原因、现象以及出现位置的确定方法,下面将正式介绍针对数据倾斜问题的几个常见解决方案。前两个方案最直接、最简单也最有效,后面几个方案稍微会复杂一些,看起来也更炫酷、更具有技术含量。

2、方案一:聚合源数据

Spark作业的数据来源通常是hive表(hive表中的数据也都是保存在HDFS中),而hive表中的数据又是怎么来的呢?众所周知,hive适合做离线大数据分析处理(如ETL,Extract、Transform、Load,即数据的抽取、转换、加载,还有hive sql),一般将hive数据处理任务放到晚上、凌晨来跑,从而形成一个完整的hive数据仓库,简单来说,hive数据仓库就是一堆特定的数据表。

Spark作业的源数据表,通常情况下是hive表,它们都是通过hive ETL生成的。为了避免Spark作业出现数据倾斜,可以直接在生成hive表的hive ETL操作中先对数据进行聚合,比如按key来分组,将key对应的所有values全部用一种特殊的格式拼接到一个字符串中,如”key=sessionid,value:actionid=1|userid=1|search_keyword=phone|categoryid=0001; actionid=2|userid=1|search_keyword=computer|categoryid=0002”。在hive ETL操作中直接对key进行了聚合,那么就意味着,每个key就只对应一条数据。然后,在Spark作业中不再需要去进行groupByKey+map这种操作,直接对每个key对应的values字符串进行map操作,得到key=sessionid,values这种形式的数据集合,再进行后续的处理操作。

在hiveETL操作中对key进行聚合,并不一定就是将相同key对应的value数据进行拼接,也可能是直接进行相关计算。如reduceByKey这类计算函数运用在hive ETL中,就能够计算出每个key对应的所有value的“总和”。在Spark作业中,由于将其groupByKey操作转移到hive ETL操作中,就不再需要进行shuffle操作了,也就从根本上避免了数据倾斜的发生。

聚合源数据还有另一种做法,可能没有办法对每个key聚合出来一条数据,那么也可以做一个妥协,对每个key对应的数据(通常会有多个维度或者叫粒度),比如1万条数据,其中包含了多个城市、多个地区的数据,可以放粗粒度,直接按照城市或者地区粒度,做聚合,将不同城市或地区的数据给聚合起来,比如cityid areaid date,select……from……group by cityid。尽量聚合,减少每个key对应的数据量,也许聚合到比较粗的粒度之后,原先有100万数据量的key,现在只有几万数据量,减轻数据倾斜现象的发生几率。

3、方案二:过滤导致数据倾斜的key

如果能够接受某些数据在Spark作业中直接摒弃掉不使用,比如有100万条数据,只有2个key的数据量达到10万,其他所有key对应的数据量都只有几十、几百,这个时候,如果业务和需求可以理解和接受的话,可以在从hive表查询源数据的时候,直接在sql语句中用where条件,过滤掉这几个key对应的数据。那么这几个原先包含有大量数据,可能会导致数据倾斜的key,被过滤掉之后,在Spark作业中自然就不会再发生数据倾斜了。

4、方案三:提高shuffle操作中reduce并行度

问题描述

如果前两种方案不适合采用,就可以考虑这个方案。比如有100万条数据,reduce task只有100个,可能某几个key对应的200万条数据都跑到某一个reduce task中进行处理,而其他reduce task都只有几万条数据,这样就可能出现数据倾斜。将reducetask的数量增多,可以让每个reduce task分配到更少的数据量,这样的话就可以缓解甚至解决掉数据倾斜的问题。

提高shuffle reduce端并行度的操作方法

在调用会导致shuffle操作的算子时,如groupByKey、countByKey、reduceByKey等,给它们传入一个参数,也就是一个数字,代表那个shuffle操作的reduce端的并行度。那么在进行shuffle操作的时候,就会对应创建指定数量的reduce task。在这种情况下,就可以让每个reduce task分配到更少的数据,在一定程度上缓解数据倾斜的问题。

举个例子,原本某个task分配的数据量非常多,运行时直接就报OOM异常,程序没法正常运行。根据log,找到发生数据倾斜的shuffle操作对应的算子,给它传入一个并行度数字,这样的话,原先那个task分配到的数据量,肯定会减少,这样可能就避免了OOM的情况,程序也就可以正常跑起来。

提高shuffle reduce并行度的缺陷

这种方案有些治标不治本的意味,因为它没有从根本上改变数据倾斜的问题,不能达到第一种和第二种方案的效果(直接避免了数据倾斜的发生),只是说,尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题。

在实际生产环境中,如果提高shuffle reduce并行度以后,大大减轻了数据倾斜问题,甚至可以让数据倾斜的现象忽略不计,那么就达到理想情况了,不用尝试其他数据倾斜解决方案了。如果在提高shuffle reduce并行度以后,之前某个task运行特别慢,要10个小时才能完成,现在稍微快了一点,变成了8个小时,或者是原先运行到某个task直接OOM,现在不会OOM了,但是那个task运行特别慢,要8个小时甚至10个小时才能跑完,那么,就应该果断放弃这种解决方案,尝试其他方案。

5、方案四:使用随机前缀进行双重聚合

适用场景

对RDD执行reduceByKey等聚合类shuffle算子操作或者在Spark SQL中使用group by语句进行分组聚合时,比较适合使用这种方案,join算子操作通常不能这样来做,需要使用其他方案。

实现思路

该方案的核心思路就是进行两阶段聚合,第一次是局部聚合,先给每个key都附加上一个随机数前缀,如10以内的随机数,此时原先一样的key就变成不一样了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对附加上随机数前缀后的数据,执行reduceByKey等聚合操作,进行局部聚合,结果就变成(1_hello, 2) (2_hello, 2)。然后将各个key的前缀去掉,结果就变成(hello,2)(hello,2),最后再次进行全局聚合操作,就可以得到最终结果,比如(hello, 4)。

实现原理

将原本相同的key通过附加随机数前缀的方式,变成多个不同的key,就可以让原本被一个reducetask处理的数据分散到多个reducetask上去做局部聚合,进而解决单个reducetask处理数据量过多的问题。接着去除掉随机数前缀,再次进行全局聚合,就可以得到最终的结果。如果一个RDD中有一个key导致数据倾斜,同时还有其他的key,那么一般先对数据集进行抽样,然后找出倾斜的key,再使用filter对原始的RDD进行分离,成为两个RDD,一个是由倾斜的key对应的数据组成的RDD1,一个是由其他的key对应的数据组成的RDD2,对于RDD1可以使用附加随机数前缀进行多分区多task计算,对于另一个RDD2正常聚合计算,最后将结果再合并起来。使用随机前缀进行双重聚合的操作原理,如下图所示。

6、方案五:将reduce join转换为mapjoin

普通的join算子,肯定是要走shuffle操作的,而一旦进行shuffle操作,就会将相同key对应的数据拉取到一个reduce task中进行join,此时就是reduce join。但是如果一个RDD的数据量比较小,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜的问题。那么,如何将reduce join转换为mapjoin呢?

实现思路

不使用普通的join算子进行连接操作,而使用broadcast变量与map类算子相结合来实现join操作,进而完全规避掉shuffle类操作,彻底避免数据倾斜问题的发生。将数据量较小的RDD直接通过collect算子拉取到Driver端的内存中,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数中,从Broadcast变量中获取数据量较小RDD的全部数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,就将两个RDD的数据按照需要的方式连接起来即可。

适用场景

如果两个RDD要进行join,其中一个RDD的数据量比较小,比如一个RDD是100万数据,另一个RDD是1万数据(或者一个RDD是1亿数据,另一个RDD是100万数据),这种情况下非常适合使用这种方案。其中一个RDD必须是数据量比较小的,broadcast出去那个小RDD的数据以后,就会在每个executor的blockmanager中都保存一份,同时,要确保内存足够存放那个小RDD的数据。这种方式下,根本不会发生shuffle操作,肯定也就不会发生数据倾斜问题,从根本上杜绝了join操作导致数据倾斜的问题。温馨提示,对于join中有数据倾斜发生的情况,尽量第一时间考虑使用这种解决方案,效果通常都比较好。

不适用场景

如果两个RDD的数据量都比较大,那么这个时候,将其中一个RDD做成broadcast,就很笨拙了,很可能导致内存不足,最终导致内存溢出,程序挂掉。对于join这种操作,不光是考虑数据倾斜的问题,即使没有发生数据倾斜,也可以优先考虑使用这种高级的reduce join转map join技术,不要使用普通的join算子去通过shuffle操作进行数据的join,完全可以通过简单的map算子操作,使用map join的方式,牺牲一点内存资源,换取更好的性能。

7、方案六:采样倾斜key并分拆进行双重join

实现思路

简单一点的方案是(适用于只有一个导致数据倾斜的key):通过sample采样得到可能发生数据倾斜的某个key,将这个key对应的数据单独拉出来,放到一个RDD中去。就用这个原本会导致数据倾斜的key对应的RDD跟另一个RDD单独进行join操作,这时候这个key对应的数据就会分散到多个task中去进行join操作。而不至于,这个key跟之前其他的key混合在一个RDD中时,肯定是会导致同一个key对应的所有数据都被拉取到一个reducetask中去进行操作,从而导致数据倾斜。

复杂一点的方案是(适用于一个或几个导致数据倾斜的key):对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计出样本中每个key的数量,计算出来数据量最大的是哪几个key。然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都附加上n以内的随机数作为前缀,而不会导致数据倾斜的大部分key对应的数据形成另外一个RDD。接着将需要join的另一个RDD,也过滤出来同样那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据通过附加n以内的随机数前缀扩展成n条数据,不会导致倾斜的大部分key对应的数据也形成另外一个RDD。再将附加了随机数前缀的独立RDD与另一个扩展n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个reduce task中去进行join,join完后再去除随机数前缀得到部分join结果。而另外两个普通的RDD就照常join即可,最后将两次join的结果使用union算子合并起来,得到最终的join结果。采样倾斜key并分拆进行双重join的操作原理,如下图所示。

适用场景

对于join操作,肯定是希望优先使用reduce join转map join方案,但如果两个RDD的数据量都比较大,那么就不能采用该方案了。针对RDD中的数据,可以先把它转换成一个中间表,或者是直接用countByKey算子亦或是使用sample采样,得到这个RDD中各个key对应的数据量。如果发现整个RDD就一个或者少数几个key对应的数据量特别多,此时就适合采用这种方案。那么,什么情况下不适用这种方案呢?如果一个RDD中,导致数据倾斜的key特别多,那么此时,最好不要采用此种方案,可以考虑采用其它解决方案。

8、方案七:使用随机数前缀以及扩容表进行join

当采用随机数前缀以及扩容表进行join来解决数据倾斜问题的时候,意味着,前面几种数据倾斜解决方案都无法使用。需要特别强调的一点是,这个方案没法彻底解决数据倾斜的问题,更多的,它只能缓解数据倾斜问题的产生。

实现思路

选择一个RDD使用flatMap算子,进行扩容,将每条数据映射为多条数据,每条映射出来的数据,都附加一个n以内的随机数前缀,通常来说会选择n=10;将另外一个RDD做普通的map映射操作,每条数据都附加上一个n以内的随机数前缀;最后将两个处理后的RDD进行join操作,并去除附加的随机数前缀,即得到最终的结果。

局限性

由于两个RDD数据量可能都很大,因此没办法去将某一个RDD扩得特别大,一般都扩大为10倍;如果是扩容10倍的话,那么数据倾斜问题的确只能说是缓解和减轻,不能说彻底解决。该方案不比采样倾斜key并分拆进行双重join方案,它从另外一个RDD中过滤出倾斜key对应的数据,可能只有一条或几条,此时,可以任意进行扩容,比如扩容1000倍(即使5000倍也没问题)。将从第一个RDD中拆分出来的那个倾斜key对应的RDD,附加上1000以内的随机数前缀。这种情况下,还可以配合上使用提高shuffle reduce并行度的方案,效果还是非常不错的,肯定也就没有数据倾斜的问题了。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181215G180YI00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

同媒体快讯

扫码关注腾讯云开发者

领取腾讯云代金券