前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >join实践: 万亿级数据量任务优化历程

join实践: 万亿级数据量任务优化历程

作者头像
Flink实战剖析
发布2022-04-18 13:46:19
5040
发布2022-04-18 13:46:19
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

优化前

代码语言:javascript
复制
SELECT  count(*)
FROM    tbl_0 a
JOIN    tbl_1 b
ON      a.ds = 20220310
AND     b.ds = 20220310
AND     a.key = b.key
;

大概执行2h, 还未得出结果。

第一次优化

暴力增加join 的并行度, 没有什么优化是比加资源来得更直接。

代码语言:javascript
复制
set odps.sql.joiner.instances=1000; //表示join 的并行度加到1000 
SELECT  count(*)
FROM    tbl_0 a
JOIN    tbl_1 b
ON      a.ds = 20220310
AND     b.ds = 20220310
AND     a.key = b.key
;

大概执行2h, 仍未得出结果。

第二次优化

重新分析两张表数据量,a 表数据量750w+, b 表数据量350w+, 在未做任何优化情况下数据是需要经过shuffle, 将相同的key分布到相同的节点上, 首先考虑使用mapjoin 解决,使其不用执行shuffle操作。

代码语言:javascript
复制
SELECT /*+mapjoin(b)*/ count(*)
FROM    tbl_0 a
JOIN    tbl_1 b
ON      a.ds = 20220310
AND     b.ds = 20220310
AND     a.key = b.key
;

大概执行2h, 仍未得出结果。

第三次优化

重新分析表数据分布情况, 查看a、b 两张表的join-key 的数据情况:

代码语言:javascript
复制
SELECT
        key
        ,count(*)
FROM    tbl_0/tbl_1
WHERE   ds =20220312
GROUP BY KEY
ORDER BY count(*) desc;

只取top5数据量的key:

a 表

WorkWell

1586079

GoodQuality

1428452

ProductExperience

1186742

BuyerRecomendSeller

1147469

UserExperience

763998

b表

ProductExperience

832075

UserExperience

309142

GoodQuality

245208

BuyerRecomendSeller

213484

SPS_Material

196508

两张表的key 的类型不多,但是单个key值的个数比较多,例如

GoodQuality 在a表中1428452条记录,在b表中245208条记录,最终就会产生 1428452*245208=3500亿的数据量,这样相同的GoodQuality 分布到同一个节点去处理,很明显发生数据长尾效应。对于这样的情况,普通的mapjoin 或者是sort-merge已经不适合了,需要尽可能的将key分散,分发到不同的节点去处理,因此使用随机前缀+扩容的方式处理。

什么是随机前缀+扩容?对其中一张表数据量扩容n倍,另外一张表对join-key生成随机0~n的随机前缀数据,通过这种方式将join-key充分打散到下游不同的节点处理,以达到优化效果。在这里通过定义udf 实现随机前缀, udtf实现数据扩容:

代码语言:javascript
复制
//生成max以内的随机数
public class RandomData extends UDF {
     public Random r;
     @Override
    public void setup(ExecutionContext ctx) throws UDFException {

      r=new Random();
    }
    public Integer evaluate(Integer max) {
        return  r.nextInt(max);
    }
}
代码语言:javascript
复制
//数据量扩充
public class ExpandData extends UDTF {
    @Override
    public void setup(ExecutionContext ctx) throws UDFException {
    }

    @Override
    public void process(Object[] args) throws UDFException {
      Long expand=(Long)(args[0]);//代表了扩充的倍数
      Object[] args1=new Object[args.length];
      for(int i=0;i<expand;i++){

           for(int j=0;j<args.length;j++){
               args1[j]=i+"_"+args[j];
           }
          super.forward(args1);
      }
    }

    @Override
    public void close() throws UDFException {

    }
}

然后重新执行SQL:

代码语言:javascript
复制
set odps.sql.joiner.instances=1000;
SELECT 
  count(*)
from (
select *, CONCAT_WS('_',RandomData(1000),key) newKey  from  tbl_0
where ds=20220312
) a join (
SELECT  newKey from (
SELECT 
 key
from
tbl_1  where ds=20220312)
LATERAL view  ExpandData(1000,key) tmp as cnt,newKey
) b on a.newKey=b.newKey;

耗时20min左右得出结果,最终得到的结果大于一万亿。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-03-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 优化前
  • 第一次优化
  • 第二次优化
  • 第三次优化
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档