万亿级数据规模下的倾斜调优

数据倾斜是海量数据处理中最棘手的问题,本文将分享团队在万亿级数据规模下的倾斜调优经验。通过深入分析实际生产实践中数据倾斜的产生场景以及对数据倾斜背后原理的深度剖析,本文将给出简单、实用、高效的调优方案。同时,对于大数据处理中的两大基本算子GroupBy和Join,本文将提出针对性的解决方案。

一、优先分析数据特点

在介绍数据倾斜之前,这里将先强调大数据处理中一个很关键但又很容易被人忽视的问题——分析数据特点。对于大数据处理任务,许多人总是忽略分析原始数据特点这一关键步骤而直接上手coding,结局往往是代码很快就写完,但是让任务在可接受的时间内跑出正确的结果却需要耗费巨量的时间。从海量数据处理开发运营中获得的一个深刻教训是——分析数据必须先于处理数据。分析数据包括但不限于以下几点:

1)分析数据的整体规模

比如开发一个8T数据量的数据处理任务。那么在处理数据前需要先分析这8T数据的原始文件个数、每个文件的大小。如果原始数据文件个数、文件大小不在合理的阈值内,那么需要优先将这8T数据处理成相对规范的文件个数和文件大小。在这之后需要分析整个任务的资源开销,对于map/reduce任务,需要预估map和reduce的个数,对于Spark任务,需要预估每个job的task的个数。基于此,可以大致确定该任务的资源开销。

2)分析数据的存储格式

数据的处理性能和数据的存储格式密切相关,对于只涉及到少量列的数据处理任务,列式存储是一个很好的选择,对于一次需要处理大量列的数据处理任务,行式存储或许是更好的选择。以上文中提到的8T数据量的数据处理任务为例,可以预先测试在不同的存储格式如parquet、gz、snappy下读取单个part的性能,对于值稀疏但是数据处理中需要用到大量列的数据处理任务,不压缩的数据往往能取得较好的读取速度。

3)分析数据列值的分布

分析数据规模、数据存储格式最主要是为了优化整个任务load原始数据的性能,但实际的数据处理中最关键的一点就是分析数据列值的分布,包括分析每一列有值的行数、每个值的行数分布。分析列值分布可以通过从全量数据中采样完成。

val sampledPairs= pairs.sample(false, 0.1)

val sampledWordCounts = sampledPairs.countByKey()

sampledWordCounts.foreach(println(_))

大数据处理中用的最多的算子莫过于GroupBy和Join,知道每一列值的分布之后才能在使用GroupBy和Join时清楚的知道其可能的运行性能,本文后续将要介绍的数据倾斜优化就依赖这一步的数据列值分布结果。

二、数据倾斜判定条件

对于某个数据处理任务,绝大多数task都在合理的时间内执行完成,但个别task执行极慢,这就是数据倾斜。比如,总共有10000个task,9997个task都在3分钟之内执行完了,但是剩余三个task却要一两个小时才能执行完或者无法执行完。Hadoop和Spark的任务监控页都能看到每个Task的执行时间,当观察到上述现象时就可以确定发生了数据倾斜,此时就需要去优化代码。这种情况在大数据处理中非常常见,倾斜任务的计算耗时往往直接决定了整个任务的时间开销。

产生这种现象的本质是个别task处理的数据量远多于其他task,因为每个task的拥有的资源是相同的,处理大数据量的task的所需的时间自然远多于其他task。

三、map任务数据倾斜原理分析

对于map/reduce任务,数据倾斜一般出现在reduce阶段,后文在将对其进行着重分析,但map过程同样会出现数据倾斜。

map过程产生数据倾斜的原因只有一个——map任务读取不支持splittable的原始文件且原始文件大小不均匀,有个别文件特别大。比如部分文件大小为2G,部分文件只有2K,假设数据处理时间和文件大小成正比,那么处理2G大小文件的task的时间是2K大小文件的100万倍。

这里先解释一下“splittable”。以HDFS分布式存储系统为例,splittable指的是一个文件是否可以被多个map同时读取,每个map读取文件的一部分数据。对于支持splittable的文件,若干文件由N个block组成,那么其可以被N个map任务同时处理,每个map处理一个block的数据,因此不管单个文件大小是多少,这种支持splittable的文件都不会产生数据倾斜。

不支持splittable的文件,单个文件的所有数据只能由一个map进行处理,因此对于一个由N个block组成的文件,若N很大,那么就会产生数据倾斜。

这里需要说明的是,文件不支持splittable并不意味着map需要读取完整个文件才能进行后续处理,在处理过程中map同样以小batch的形式读取文件。

Hadoop生态中各种文件格式对于splittable的支持现状如下图所示:

因此在处理数据时,若发现文件格式不支持splittable且文件大小不一致,那么应该留意是否会发生map阶段的数据倾斜。

四、reduce任务数据倾斜原理分析

shuffle指的是数据从map阶段到reduce阶段的处理过程。在shuffle时,map阶段的输出会根据hash算法映射为reduce的输入,一个map的所有输出会根据hash算法分散到若干个reduce中。更通俗的说,在进行shuffle时,必须将各个task节点上相同的key拉取到某个task来进行处理。在spark中会会触发shuffle操作的算子有distinct、 groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

reduce阶段产生数据倾斜的原因不言而明,即不均匀的shuffle。由于shuffle key的不均匀,部分task处理的数据远多于其余task,因此产生了数据倾斜。比如大部分key对应1万条数据,但是个别key却对应了1000万条数据,那么大部分task可能就只会分配到1万条数据,然后1秒钟就运行完了;但是个别task可能分配到了1000万数据,要运行几十分钟。因此,整个作业的运行进度是由运行时间最长的那个task决定的。

那么为什么会出现不均匀的shuffle呢?因为现实生产环境中的大量数据本就是不均匀的,比如统计某一天腾讯视频在各个国家的广告曝光量,那么会有一个group by country的计算步骤,由于腾讯视频国内流量远大于国外流量,因此当执行group by country时,必然会有一个reduce task需要处理所有国内流量的数据,国内流量数据规模达到百亿级,task的执行会及其缓慢,且很有可能发生OOM。

大数据开发中的一个rule of thumb ——尽可能避免发生数据的shuffle。这是因为数据shuffle的开销很大。以spark任务为例,spark本身的计算通常都是在内存中完成的,由于数据本地性策略,绝大多数数据都可以在同一个node或者同一个rack中获得,不需要跑到别的node或者rack上去读取数据,因此执行的速度非常快。但是,如果发生了shuffle操作,那么就会因为网络传输、数据序列化/反序列化产生大量的磁盘IO和CPU开销,这个性能上的损失是非常巨大的。要减少shuffle的开销,主要有两个思路:

1)减少shuffle次数,把数据处理在本地完成

2)减少shuffle的数据规模

五、reduce任务数据倾斜产生场景

对于给定的未知数据,如果预先对数据特点进行分析,那么很容易发现所有可能产生数据倾斜的key,这也是本文在一开头就强调的处理大数据任务时要“优先分析数据特点”。那么现实生产环境中哪些场景下key可能会倾斜呢?根据我们的经验,主要有两大类场景:

1)存在业务默认填充值

如用户的imei在获取不到时被填充了默认值;如广告系统在请求不到广告时播放了默认的广告,这些默认的广告的订单号都相同;如某个业务字段当前只有一个可选值。

2)业务本身存在热点

如热播剧的广告曝光量会显著大于一般的剧;视频前贴片这个广告位类型的曝光量会显著大于其他广告位类型;国内的广告曝光量显著大于国外。

3)存在恶意数据

如同一个ID刷了海量广告曝光。

六、reduce任务数据倾斜解决方案

会产生数据倾斜的算子本质上只有两个,即GroupBy和Join。下面将对这两个算子的数据倾斜提供具体解决方案。需要说明的是下面讨论的解决方案是从根本上解决数据倾斜而不是“缓解”数据倾斜(如简单增加reduce个数)。

1)GroupBy算子的倾斜

GroupBy算子的倾斜本质上只有一种解决方案——两阶段聚合。

阶段1:修改聚合key进行局部聚合

将GorupBy(key)转换为GroupBy(key, randomNum),其中randomNum是一个随机数,该随机数的取值范围设置为(0,N),其中N的取值由key的倾斜程序决定。假设某个reduce需要处理1亿的数据,处理100万的数据可以在10秒内完成,那么可以将N设置为100,这样原先有一个reduce处理的数据将有100个reduce处理。

完成局部聚合后,阶段1输出格式为:(key,aggregateValue)

阶段2:按照原始key进行全局聚合

进过阶段1的聚合,进入阶段2的key的数据规模已经降为原来的1/N,此时再执行GorupBy(key)就可以顺利完成了。

2)Join算子的倾斜

Join算子的倾斜可以分为两大类:

a)大表Join大表

b)大表Join小表

a)大表Join小表

大表Join小表优先考虑map join,mapjoin的实现思想是在map阶段将小表放入map内存中,通过lookup的形式完成join。在spark中实现mapjoin的实现是broadcast join。broadcastjoin是通过将变量broadcast后在每个executor中存放一个copy实现的,broadcast变量的实现逻辑如下:

在使用broadcast join过程中有几个点需要强调一下:

1)显式broadcastjoin VS隐式broadcast join

val df1 = spark.range(1000)

val df2 = spark.range(1000)

df1.join(df2, Seq("id")).explain

从执行计划中可以看到join的实现是BroadcastHashJoin

2)broadcast join内存配置、内存膨胀

broadcast一个大小为N个字节的变量,需要至少2N的内存空间。这个由spark中broadcast变量的实现方式所决定。在spark中,driver会将一个大的broadcast变量分割为一个一个小的broadcast piece,这些小的broadcast piece将会通过BitTorrent的形式广播到每个executor中。driver在将整个大的broadcast变量分割完之前会在内存中一直保存原来的变量,因此在这个过程中需要两倍于broadcast变量大小的内存空间。

broadcast变量还需要注意的一点是变量相比于hdfs上的大小会有2~5倍的膨胀。如一个hdfs上原始大小1G多一点的文件在broadcast过程中大小变为了5.5G。

3)broadcast较大的小表

在spark中broadcast一个较大的小表如1~5G,且整个application中存在较多的executor时,完成broadcast小表的整个过程会相对耗时,耗时可能达到几分钟,spark中通过spark.sql.broadcastTimeout来控制broadcast的超时时间,默认是300s,若业务中实际耗时可能超过300s,那么可以通过增加spark.sql.broadcastTimeout来实现。下图中可以看到每个broadcast变量各个阶段的耗时。

b)大表Join大表

假设BigTable A和BigTable B进行Join。若BigTableA中包含大量重复的key K,BigTable B存在大量重复的key。

那么解决方案如下:

将BigTable B分拆为两个表B1和B2,其中B1只包含K,B2包含除K外的其他所有key;

将B1和BigTableA进行reducejoin得到join result 1,将B2和BigTable A进行shufflejoin得到join result 2,将joinresult 1和join result 2进行union得到最终的结果。

假设BigTable A和BigTable B进行Join。若BigTableA中包含大量重复的key K,BigTable B也存在大量重复的key K。

那么解决方案如下:

将BigTable A分拆为两个表A1和A2,其中A1只包含K,A2包含除K外的其他所有key;

将BigTable B分拆为两个表B1和B2,其中B1只包含K,B2包含除K外的其他所有key;将A1和B1进行reduce join得到joinresult 1,将A1和B2 进行reduce join得到join result 2,将A2和B1进行reduce join得到join result 3,将A2和B2进行shuffle join得到join result 4。将join result 1、join result 2、join result 3、join result 4进行union得到最终的结果。

本文总结分享了团队在处理数据倾斜场景中的实践经验,如有疑问欢迎留言沟通。感谢您的阅读。

Ref

https://spark.apache.org/docs/latest/configuration.html

https://spark-summit.org/east-2016

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/

https://github.com/JerryLead/SparkInternals

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/

https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals

https://databricks.com/session/deep-dive-apache-spark-memory-management

https://databricks.com/session/tuning-apache-spark-for-large-scale-workloads

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180227G0ILNW00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券