数据倾斜解决方法总结

前言

在使用Spark、Hive的过程中经常会碰到数据倾斜的问题,数据倾斜基本都发生在group、join等需要数据shuffle的操作中,这些过程需要按照key值进行数据汇集处理,如果key值过于集中,在汇集过程中大部分数据汇集到一台机,这就导致数据倾斜。数据倾斜不仅无法充分利用分布式带来的好处,而且可能导致内存消耗过大超过负载直接导致任务延迟或者失败。本文就是将所有在工作中遇到的数据倾斜的问题及其解决方案进行汇总记录。

一、 少量Key值重复数量特别多需要GroupBy

需求:统计test_table2 中key出现的次数,要求这些key必须出现在test_table1中

需求很简单,但是很多业务都用到,统计代码如下:

select a.key as key
    ,b.pv as pv
from (        
    select key
    from test_table1
    where ds = '20170418'
)a
join(        
    select key
        ,count(1) as pv
    from test_table2
    where ds = '20170418'
    group by key
)b
on a.key = b.key

test_table1数据量只有1000条而且关于key去重,test_table2数据量有一亿条,这个任务在没有优化之前跑了接近一个小时,这是无法接受的,而且查看日志发现有一个reduce运行时长非常久,其他reduce都能够在1min中之内完成,由此可推断出必然发生了数据倾斜。

1.1 问题的根源及解决方法

不考虑数据本身,从代码层面来分析,有两个地方会发生数据倾斜,但是根本原因只有一个那就是test_table2中某个key值大量重复,于是在统计一下test_table2出现次数最多的Top10的key,发现确实有一个key大量重复,此时有两种解决方案:

如果头部key不是业务需要的key,直接过滤

如果头部key都很多,而且都是业务需要的key,考虑加入随机数

如下代码所示:

select a.key as key
    ,b.pv as pv
from (        
    select key
    from test_table1
    where ds = '20170418'
)a
join(
    select key
        ,sum(pv) as pv
    from (
        select key
            ,round(rand()*1000) as rnd --加入随机,强制将原来一组的拆成多组,增加并发度
            ,count(1) as pv
        from test_table2
        where ds = '20170418'
        group by key, rnd
    )tmp 
    group by key
)b
on a.key = b.key

注意:

无论是否使用mapjoin,小表都需要放入前面,原因是reduce阶段会将第一个表的key全部放入内存,当第二个表到来的时候就开始输出; 在读表之后第一件事就是尽可能的过滤不必要的数据,理清业务含义先过滤脏数据和业务不相关的数据。

二、少量Key值重复数量特别多需要Join

需求:统计test_table2是由key1、key2、value2三个字段组成,但key1和key2不是主键,test_table1是key1和key2组合作为主键的,需求是找出test_table2满足存在test_table1的组合主键的记录,并求字段value2的和以及每个单独键出现的次数。

这种需要实际工作中也是经常遇到,由于HIVE不支持where的in条件嵌套子查询,那么解决这个需求最正常的方法是用join操作如下:

select count(distinct key1) as key1_uv
    ,count(distinct key2) as key2_uv
    ,sum(value2) as value2
from (
    select a.key1 as key1
        ,a.key2 as key2
        ,b.value2 as value2
    from (
        select  key1
            ,key2
        from test_table1
    )a
    join (
        select key1
            ,key2                
            ,sum(value2) as value2
        from test_table2
        group by key1, key2
    )b
    on a.key1 = b.key1 and a.key2 = b.key2
)tmp

2.1 分析问题并解决

这个需求可能导致倾斜的地方有三处:

  1. 对test_table2的group by
  2. join操作
  3. 两个 count(distinct)

如果前面两个倾斜会发生,原因也是和案例一是相同的,就是test_table2的key1和key2的组合key有数量明显大的组合。此时解决方法同1相似。但是这里要提出第三种解决方法:

如果头部key都很多,而且都是业务需要的key,但是join的一方是小表可以使用mapjoin

如下代码所示:

select count(distinct key1) as key1_uv
    ,count(distinct key2) as key2_uv
    ,sum(value2) as value2
from (
    select /*+MAPJOIN(a)*/
        a.key1 as key1
        ,a.key2 as key2
        ,b.value2 as value2
    from (
        select  key1
            ,key2
        from test_table1
    )a
    join (
        --注意:不去重直接join
        select key1
            ,key2                
            ,value2
        from test_table2
    )b
    on a.key1 = b.key1 and a.key2 = b.key2
)tmp

注意:

  1. 使用mapjoin第二个表就先不进行group by,因为group by会导致倾斜;
  2. 注意在新的hive版本上加入:set hive.auto.convert.join=false; set hive.ignore.mapjoin.hint=false; 这两个参数设置,使得mapjoin能够生效。

这样mapjoin肯定可以完全避免数据倾斜,如果join之后数据量变得很少,上面两个count(distinct)操作就会很快

如果数据量还是很大两个count(distinct)在一起有一个key某些值特别多出现倾斜,此时业务不需要非常精准的去重,可以考虑使用基数估计

Tips: 在Tdw中有est_distinct这个函数,直接取代 count(distinct col_name) => est_distinct(col_name) 即可) 即可

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

谢慧志的专栏

1 篇文章1 人订阅

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java架构师

Hadoop学习19--推测式执行

  所谓推测式执行,就是计算框架判断,如果有一个task执行的过慢,则会启动备份任务,最终使用原任务+备份任务中执行较快task的结果。产生原因一般是程序bug...

2799
来自专栏腾讯大数据的专栏

协同过滤推荐算法在MapReduce与Spark上实现对比

MapReduce为大数据挖掘提供了有力的支持,但是复杂的挖掘算法往往需要多个MapReduce作业才能完成,多个作业之间存在着冗余的磁盘读写开销和多次资源申...

2286
来自专栏编程札记

Goroutine并发调度模型深入之实现一个协程池

5645
来自专栏大数据和云计算技术

元数据概念

刘耀铭同学元数据系列作品的第一篇,大家支持! 其他元数据相关系列文章: 基于元数据驱动的ETL Hive 元数据表结构详解 1、 元数据是描述其他数据的数据(...

36911
来自专栏华章科技

MapReduce 原理与设计思想

出处:http://www.cnblogs.com/archimedes/p/mapreduce-principle.html

652
来自专栏芋道源码1024

【追光者系列】HikariCP 连接池配多大合适(第一弹)?

首先声明一下观点:How big should HikariCP be? Not how big but rather how small!连接池的大小不是设置...

860
来自专栏加米谷大数据

Spark核心谈

在大数据领域,Spark平台因计算模型涵盖MapReduce,Streaming,SQL,Machine Learning,Graph等,为大数据计算提供一栈式...

1171
来自专栏CSDN技术头条

像Apache Storm一样简单的分布式图计算

介绍 计算可能很复杂。对我们来说,这种复杂主要就是软件世界的人类驱动力。甚至有一个学科整个都围绕着问题解决和计算——计算机科学。 当一个人开始学习计算机科学时,...

2176
来自专栏数据派THU

独家 | 一文读懂Hadoop(三):Mapreduce

随着全球经济的不断发展,大数据时代早已悄悄到来,而Hadoop又是大数据环境的基础,想入门大数据行业首先需要了解Hadoop的知识。2017年年初apache发...

2168
来自专栏linux驱动个人学习

服务器体系(SMP, NUMA, MPP)与共享存储器架构(UMA和NUMA)

各CPU共享相同的物理内存,每个 CPU访问内存中的任何地址所需时间是相同的,因此SMP也被称为一致存储器访问结构(UMA:Uniform Memory Acc...

642

扫码关注云+社区