前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >高效大数据开发之数据倾斜的实践

高效大数据开发之数据倾斜的实践

原创
作者头像
用户10268982
发布2022-12-16 10:41:53
6100
发布2022-12-16 10:41:53
举报
文章被收录于专栏:数据仓库优化

一、前言

数据倾斜是在数仓开发里最常见的,且最为头疼的问题。我们也或多或少的知道数据倾斜是因为数据分配不均匀,导致部分节点要花很长时间处理大量的数据,我们也知道不管是mr还是spark,大多是在shuffle阶段出现倾斜,当然我们也知道group by和join均可能出现数据倾斜现象,而网上大多数的解决方案都建议从2方面着手处理:1.从业务方面,能否直接过滤掉导致倾斜的数据;2、从技术层面上,a.调整运行参数,b.sql优化之改用mapjoin、关联或聚合key增加随机数、将join改成union all等

二、数据倾斜之我见

首先数据倾斜大多数场景是从业务层面无法过滤掉的,其次调整运行参数只是治标不能治本,只能起缓解作用,最后大部分情况下关联的表比较少符合mapjoin场景,因此大多数情况下还得从sql本身着手优化。

而在我们日常工作中最常遇到的就是group by倾斜和大表join倾斜,因此本文就对这两种情况进行具体优化过程进行分析,而在正式开始案例之前,有必要讲一下优化原则:尽量让shuffle阶段的数据保持量少且均匀。

三、group by数据倾斜的案例

1.优化方法

group by倾斜相对简单,只需进行二次group by即可

2.优化背景

下面这段sql在某一天开始之后就一直执行失败,就猜测会不会是device_id出现了数据倾斜

代码语言:javascript
复制
--统计每个device_id的当天首条日志和末条日志的账号等维度
select
    device_id
    ,max(if(first_row=last_row,vuserid,'')) vuserid
    ,max(if(first_row=1,vuserid,'')) first_vuserid
  from
  ( select
        device_id
        ,vuserid
        ,row_number() over (partition by device_id order by first_act_date,first_ctime) first_row
        ,count(1) over (partition by device_id) last_row
      from
        dws_xxx_dau_df
      where
        imp_date = 20221204
   )a2
  group by
    device_id

3.分析过程

通过直接查device_id在表的数据情况,分析发现row_number()这里跑不过,原因就是分组的key(device_id)过大导致,最大的单个device_id的pv去到了253万,排第二的在51万,超过10000的不到100条,因此要将这100多个device_id的记录数都减少到10000条以下:

代码语言:javascript
复制
select
    device_id
    ,count(1) pv
  from
    dws_xxx_dau_df
  where
    imp_date = 20221204
  group by
    device_id
  order by
    pv desc limit 1000

4.解决思路

只需进行二次group by,举一个简单例子:如下图,第一次group by时aaa从4条变成2条,bbb从2条到1条,而对于ccc、ddd、eee增加一个随机数并不影响,接着再进行第二次group by,经过这样处理就能避免倾斜了。回到案例,这里需要增加一个随机数字段(0~200)先做一次group by,这样第一次group by限制最高key对应的记录数在10000条左右,第二次group by每个key对应的记录数就只会在200条以内,这样就能完美避开倾斜问题了,但是需要注意的是,这里随机数给多少需要根据实际数据情况进行分析:

代码语言:javascript
复制
select
    device_id
    ,max(if(first_row=last_row,vuserid,'')) vuserid
    ,max(if(first_row=1,vuserid,'')) first_vuserid
  from
  ( select
        device_id
        ,vuserid
        ,first_vuserid
        ,row_number() over (partition by device_id order by first_act_date,first_ctime) first_row
        ,count(1) over (partition by device_id) last_row  --再根据device_id group by第二次
      from
      ( select
            device_id
            ,rand_num
            ,max(if(first_row=last_row,vuserid,'')) vuserid
            ,max(if(first_row=1,vuserid,'')) first_vuserid
          from
          ( select
                device_id
                ,vuserid
                ,rand_num
                ,row_number() over (partition by device_id,rand_num order by first_act_date,first_ctime) first_row
                ,count(1) over (partition by device_id,rand_num) last_row  --增加rand_num一起先group by一次
              from
              ( select
                    device_id
                    ,vuserid
                    ,first_act_date
                    ,first_ctime
                    ,cast(rand()*200 as bigint) rand_num  --增加一个200以内随机数
                  from
                    dws_xxx_dau_df
                  where
                    imp_date = 20221204
               )a1
           )a2
          group by
            device_id
            ,rand_num
       )a3
   )a4
  group by
    device_id

5.优化收益

该任务从优化前的60~80分钟到优化后的10~20分钟,时效提升了4~6倍:

四、大表join大表数据倾斜的案例

1、优化方法

大表优化最好的办法就是能知道关联的key是啥,然后对出现倾斜的key进行打散,从而避免倾斜。

2、优化背景

有一个实验指标表,包含了播放、页面访问、元素曝光点击等事件的表(30亿)需要关联一个全量维度表(500万),运行时间过长(在2~3小时),另外由于该表用在实验上是非常很重要,为避免影响SLA,因此需要进行优化。

代码语言:javascript
复制
select
    a.qimei36 qimei36
    ,a.biz_id biz_id
    ,a.vid vid
    ,a.cid cid
    ,b.el_type_name el_type_name
    ,b.el_type_name_2 el_type_name_2
    ,b.el_type_name_3 el_type_name_3
  from
  ( select  --30亿记录
        qimei36
        ,biz_id
        ,vid
        ,cid
        ,sum(play_cnt) play_cnt
        ,sum(page_cnt) page_cnt
      from
        dws_xxx_view_play_page_ele_imp_clck_aggr  --播放、页面、元素曝光、元素点击聚合虚拟表
      group by
        qimei36
        ,biz_id
        ,vid
        ,cid
   )a
  left join
  ( select --500万记录
        pg_cid cid
        ,el_type_name
        ,el_type_name_2
        ,el_type_name_3
      from
        dim_cid_content_df
   )b
   on
    a.cid=b.cid 

3.分析过程

猜测很大可能出现数据倾斜了,该案例里面有group by又有join,因此需要进一步分析来确定倾斜出现的位置:

(1)去掉join 操作,仅仅执行group by操作落表,执行时间在20分钟内,也就说join操作用了80%的运行时间,因此可能出现倾斜的是在join操作上,需进一步分析

(2)分别分析left join的左表(下面用A表标志),右表(下面用B表标志)的数据情况,先看两个表总数据量级(A表30亿,B表500万),属于大表join大表操作,再分别用关联key进行group by 取top1000条数据进行分析:A表关联key出现很大的悬殊(最大的cid有接近3亿,前3条cid占比总记录数接近20%),B表关联key唯一,如下图:

(3)从执行日志分析,也能看出是join操作出现了数据倾斜,如下图所示,join操作用了1.8小时,占了80%+的运行时长:

4.解决思路

由于B表是历史全量cid,而A表为当天日志,因此需要先分析A表的cid占据B表全量表的数量,经过直接关联发现只关联上维表10万cid,因此我解决思路如下:

(1)尽量减少join操作的记录量,因此可以先将10万关联得上的cid存储到临时表,并且增加一个统计每个cid的pv和row_num指标,用于判断是否倾斜的cid,这里设定单个cid超过100万行记录的cid为倾斜的cid。

代码语言:javascript
复制
create table tmp_table_C as
select  --最后只剩10万左右用户
    pg_cid
    ,pv
    ,el_type_name
    ,el_type_name_2
    ,el_type_name_3
    ,ceil(pv/1000000) row_num  --控制最大数量在100万以下,又可以保证进行扩散的key不那么多
  from
  ( select
        pg_cid
        ,sum(pv) pv
        ,min(pv) min_pv
        ,max(el_type_name) el_type_name
        ,max(el_type_name_2) el_type_name_2
        ,max(el_type_name_3) el_type_name_3
      from
      ( select
            1 pv
            ,pg_cid
            ,null el_type_name
            ,null el_type_name_2
            ,null el_type_name_3
          from
            dws_xxx_view_play_page_ele_imp_clck_aggr  --播放、页面、元素曝光、元素点击聚合虚拟表
        union all
        select
            0 pv
            ,pg_cid
            ,el_type_name
            ,el_type_name_2
            ,el_type_name_3
          from
            dim_cid_content_df
       )a
      group by
        pg_cid
   )a
  where
    pv>0  -- pv>0 说明在日志表出现
    and min_pv=0 -- min_pv=0 说明在维表中出现

(2)再将倾斜的cid进行随机打散关联,这里打散的的程度根据当前cid的数据倾斜程度进行动态分配,从而解决数据倾斜问题:

代码语言:javascript
复制
--先将需要扩散的cid,以及标志扩散程度的row_num提取出来
--这里row_num的值是:如果 pv<100w行,row_num=1,如果cid='asdfg'的pv=200万行,row_num=2,也就说在拼接成关联字段时,该cid会变成两个cid进行关联:'asdfg#0'和'asdfg#1'
sql = """
        select
            pg_cid
            ,row_num
          from
            %(table)s_%(imp_date)s
          where
            row_num>1
    """%{"table": self.table_list[0]
              , "imp_date": self.datetime}
    self.log(sql)
    res = self.tdw.execute(sql)
    if(len(res)>=1):
        in_sql=""
        key_sql=""
        for index in range(len(res)):
            res_a = res[index].split("\t")[0]
            res_b = res[index].split("\t")[1]
            if len(res_a)>0 :
                in_sql = in_sql + res_a + ":" + res_b + ","
                key_sql += "'%s',"%(res_a)
    insql = "'" + in_sql + "'"  --例子: 'mzc00200xh9313v:13,mzc002003q7sks5:2,mzc0038zhzf6h5t:2,mzc00200aakg0rw:4,'
    keysql = key_sql[:-1]  --例子:'mzc00200xh9313v','mzc002003q7sks5','mzc0038zhzf6h5t','mzc00200aakg0rw'
    return insql,keysql







--插入表
insert overwrite table %(table)s partition(imp_date=%(imp_date)s)
select
    a.qimei36 qimei36
    ,a.biz_id biz_id
    ,a.vid vid
    ,a.cid cid
    ,b.el_type_name el_type_name
    ,b.el_type_name_2 el_type_name_2
    ,b.el_type_name_3 el_type_name_3
  from
  ( select  --30亿记录
        qimei36
        ,biz_id
        ,vid
        ,cid
        ,case
          when coalesce(pg_cid, '') = '' then floor(rand()*10000)  --如果是null,给随机数
          when pg_cid in (%(keysql)s) then concat(pg_cid,'#',floor(rand()*cast(split(split(%(insql)s,concat(pg_cid,':'))[1],',')[0] as bigint)))  --如果是极大值,将key给随机数进行打散处理
          else pg_cid
         end join_key
        ,sum(play_cnt) play_cnt
        ,sum(page_cnt) page_cnt
      from
        view_play_page_ele_imp_clck_aggr  --播放、页面、元素曝光、元素点击聚合虚拟表
      group by
        qimei36
        ,biz_id
        ,vid
        ,cid
   )a
  left join
  ( select
        concat(pg_cid,'#',id_num) cid
        ,pv
        ,el_type_name
        ,el_type_name_2
        ,el_type_name_3
      from
      ( select
            pg_cid
            ,pv
            ,el_type_name
            ,el_type_name_2
            ,el_type_name_3
            ,row_num
            ,1 id
          from
            tmp_table_C
          where
            row_num > 1
       )a
      join
      (select * from dim_id_num_0_to_9999_f where id_num<1000) b  --进行将维表的cid扩散成id_num倍(id_num=2就扩展成2倍,id_num=100就扩展成100倍)
       on
        a.id=b.id
      where
        b.id_num < a.row_num
    union all
    select
        pg_cid cid
        ,pv
        ,el_type_name
        ,el_type_name_2
        ,el_type_name_3
      from
        tmp_table_C
      where
        row_num = 1  -- pv<100万行的数据不用扩散,只保留1行就行
   )b
   on
    a.cid=b.cid

其中有几个点需要说明一下:

a、insql和key_sql的作用是将在A表出现倾斜的cid进行随机打散,通过传入常数参数的方式进行处理,下图为真正执行的情况

b、A表的cid后面随机加上0~row_num-1的数字,B表对应的cid扩散到row_num行,其中前面也是cid后面拼接上0~row_num-1的数字,因此关联的时候,不管A表随机变成啥,在B表都能找到关联,从而解决倾斜的同时也不会导致关联数据量变少,如下图,对aaa发散4倍,bbb发散2倍,ccc、ddd和eee不作发散关联,此时左边关联最大key记录数为4,右边关联最大key记录数为2,从而解决了数据倾斜问题:

c、dim_id_num_0_to_9999_f 是对维表cid发散处理的维表,本质上是做了笛卡尔积,所以只需拿需要发散的100多条cid进行row_num倍发散,其他不做处理,避免对大量数据进行无意义的笛卡尔积操作

5.优化收益

该任务从优化前的2~3小时到优化后的40~50分钟,时效提升了2~4倍:

五、总结

本文只是简单列举了两种比较常见的解决数据倾斜的sql优化案例,但是实际开发过程中还会有遇到其他情况,因此需要特殊情况进行特殊处理,而且有些时候需要多种解决方案同时作用才能比较好的解决数据倾斜问题,但是万变不离其宗,因此我认为自我思考并且进行分析的过程才是最重要的,本文列举的两个案例都有详细分析过程,而且解决的方向都是让一次shuffle的记录数尽量不要过大,尽量保持更均匀的原则,因此只要能让你的数据保持均匀,数据倾斜就会得到解决了。

如果想要更详细了解数据倾斜的原理可以自行搜索引擎里,希望本文对于想要解决自身数据倾斜业务的同学有一个实际参考作用。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、数据倾斜之我见
  • 三、group by数据倾斜的案例
    • 1.优化方法
      • 2.优化背景
        • 3.分析过程
          • 4.解决思路
            • 5.优化收益
            • 四、大表join大表数据倾斜的案例
              • 1、优化方法
                • 2、优化背景
                  • 3.分析过程
                    • 4.解决思路
                      • 5.优化收益
                      • 五、总结
                      相关产品与服务
                      Elasticsearch Service
                      腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档