前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Count-Distinct实践: 万亿级数据量任务优化方式

Count-Distinct实践: 万亿级数据量任务优化方式

作者头像
Flink实战剖析
发布2022-04-18 13:46:59
6480
发布2022-04-18 13:46:59
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

join实践: 万亿级数据量任务优化历程

单字段去重

先看一个简单的sql ,pv_id 去重计数

代码语言:javascript
复制
SELECT  
   visit_type,
   count(DISTINCT pv_id)  as pv_cnt
from exp_table 
where ds=20220320
group by visit_type;

在默认情况下,相同的visit_type 的pv_id 会被分配到同一个reducer中处理,如果某个visit_type的数据量特别大,那么对应的reducer执行耗时会比较久或者可能会发生OOM,因此常规优化方式是:

代码语言:javascript
复制
select 
visit_type,count(*)
from (
SELECT  
  visit_type,pv_id
from exp_table
where ds=20220320
group by visit_type,pv_id
) group by visit_type;

也就是将count distinct 转换为 group by 操作,第一层根据visit_type,pv_id分组,第二层根据visit_type 直接求和即可,使数据分布更加均匀。但是 这种方式在第二层group by 也可能会产生大量的数据shuffle操作,可以再次优化:

代码语言:javascript
复制
select 
visit_type,sum(cnt)
from (
SELECT  
  visit_type,
  count(distinct pv_id) as cnt
from exp_table
where ds=20220320
group by visit_type,hash(pv_id)%50
) group by visit_type;

第一层使用visit_type+hash(pv_id)%50 方式分组,对相同visit_type下的pv_id分了50组,保证相同pv_id 都能分配到相同的reducer中去,然后执行去重计数(cnt)操作,然后在第二层中根据visit_type 分组,对cnt求和即可。这种方式在第二层shuffle过程中数据就会相对减少很多。

多字段去重

代码语言:javascript
复制
SELECT  
  visit_type,
  count(distinct pv_id),
  count(distinct item_id)
from exp_table
where ds=20220320
group by visit_type;

这次同时需要对pv_id与item_id去重计数,如果还是按照上述的优化方式将visit_type、pv_id、item_id组合很显然已经行不通了,没办法保证相同的session_id或者item_id都会分配在同一个reducer中去。先使用常规意义上的操作:

代码语言:javascript
复制
SELECT  a.visit_type
        ,a.cnt1
        ,b.cnt2
FROM    (
            SELECT  visit_type
                    ,count(*) AS cnt1
            FROM    (
                        SELECT  visit_type
                                ,pv_id
                        FROM    exp_table
                        WHERE   ds = 20220320
                        GROUP BY visit_type
                                 ,pv_id
                    ) 
            GROUP BY visit_type
        ) a
join    (
            SELECT  visit_type
                    ,count(*) AS cnt2
            FROM    (
                        SELECT  visit_type
                                ,item_id
                        FROM    exp_table
                        WHERE   ds = 20220320
                        GROUP BY visit_type
                                 ,item_id
                    ) 
            GROUP BY visit_type
        ) b
ON      a.visit_type = b.visit_type
;

也就是先拆分再join, 很显然这种方式开发难度大,特别是在处理字段更多的情况下。再重新按照单字段优化方式思考,希望按照所有的去重字段组合的情况下,仍然能够保证相同pv_id或者item_id都会分配在同一个reducer中去处理, 也是pv_id与item_id各自不影响其分配方式,可以采取先扩充数据,即将每一条数据扩充到去重字段个数的倍数,并且保证一个去重的字段不为空,并且增加标识字段,表明去重的列,如下图:

扩充后的数据执行常规的去重操作,即然后组合去重字段分组然后最外层进行汇总,由于扩充之后的数据每一条只有一个不为空的列,那么在执行shuffle 的时候,相同的pv_id或者item_id一定会分配在同一个reducer中去处理。数据扩充使用udtf实现:

代码语言:javascript
复制
 @Override
    public void process(Object[] args) throws UDFException {
        // TODO
       for(int i=0;i<args.length;i++){
           Object[] nObjects=new Object[args.length+1];
           for(int j=0;j<args.length;j++){
                if(i==j) {
                    nObjects[j]=args[i];
                }else{
                    nObjects[j]=null;
                }

           }
          nObjects[args.length]="flag"+i;
          this.forward(nObjects);
       }
    }

具体优化sql:

代码语言:javascript
复制
SELECT  visit_type
        ,count(CASE WHEN TYPE='flag0' THEN 1 END) AS pv_cnt
        ,count(CASE WHEN TYPE='flag1' THEN 1 END) AS item_cnt
FROM    (
            SELECT  visit_type
                    ,pv_id1
                    ,item_id1
                    ,type
            FROM    (
                        SELECT  visit_type
                                ,pv_id1
                                ,item_id1
                                ,type
                        FROM    exp_table
                        LATERAL VIEW ExpandHash(pv_id,item_id) tmp AS pv_id1,item_id1,type
                        WHERE   ds = 20220320
                    ) 
            GROUP BY visit_type
                     ,pv_id1
                     ,item_id1
                     ,type
        ) 
GROUP BY visit_type

这种方式导致了数据量翻倍,在shuffle阶段io 也会耗时增加,具体耗时、资源消耗以实际情况为准,然后去做均衡具体选择哪一种方式。

思考

Q: 同时存在count distinct 与 sum 类的聚合该如何优化倾斜问题?

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-03-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 单字段去重
  • 多字段去重
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档