前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark sql多维分析优化——细节是魔鬼

spark sql多维分析优化——细节是魔鬼

作者头像
数据仓库践行者
发布2020-04-20 11:51:59
3.8K0
发布2020-04-20 11:51:59
举报

这次是分享一个多维分析优化的案例

【本文大纲】

  1. 业务背景
  2. spark sql处理count distinct的原理
  3. spark sql 处理 grouping sets的原理
  4. 优化过程及效果
  5. 总结

1、业务背景

先上sql:

代码语言:javascript
复制
select 
        if(req_netease_user is null, 'all', req_netease_user) as netease_user, 
        if(campaign_id is null, 'all', campaign_id) as campaign_id, 
        if(spec_id is null, 'all', spec_id) as spec_id, 
        if(app_bundle is null, 'all', app_bundle) as app_bundle, 
        if(render_name is null,'all', render_name) as render_name, 
        if(platform is null, 'all', platform) as platform, 
        count(distinct request_id) as bid_request_num,
        count(distinct deviceid) as bid_request_uv,
        count(distinct case when bid_response_nbr=10001 then bid_response_id else null end) as offer_num,
        count(distinct case when bid_response_nbr=10001 then deviceid else null end) as offer_uv,
    dt
from 
(
    select
        distinct dt, 
        if(req_netease_user is null, 'null', req_netease_user) as req_netease_user, 
        if(render_name is null, 'null', render_name) as render_name, 
        if(platform is null,'null', platform) as platform, 
        if(campaign_id is null, 'null', campaign_id) as campaign_id,
        if(spec_id is null, 'null', spec_id) as spec_id, 
        if(app_bundle is null, 'null', app_bundle) as app_bundle,
        request_id, 
        bid_response_nbr, 
        bid_response_id, 
        deviceid
    from table_a where dt = '2019-08-11' and request_id is not null
) tmp group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform
grouping sets(
  (dt),

  (dt, req_netease_user), 
  (dt, campaign_id),
  (dt, spec_id),
  (dt, app_bundle),
  (dt, render_name),
  (dt, platform),

  (dt, req_netease_user, campaign_id),
  (dt, req_netease_user, spec_id),
  (dt, req_netease_user, app_bundle),
  (dt, req_netease_user, render_name),
  (dt, req_netease_user, platform),

  (dt, req_netease_user, campaign_id, spec_id),
  (dt, req_netease_user, campaign_id, app_bundle),
  (dt, req_netease_user, campaign_id, render_name),
  (dt, req_netease_user, campaign_id, platform),
    
    (dt, req_netease_user, campaign_id, spec_id, app_bundle),
    (dt, req_netease_user, campaign_id, spec_id, render_name),
    (dt, req_netease_user, campaign_id, spec_id, platform),

    (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name),
    (dt, req_netease_user, campaign_id, spec_id, app_bundle, platform),

    (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform)
)

逻辑不复杂,就是慢,运行时间如下:

要运行5个小时~~~

这是一张广告竞价的业务表,每一条请求 request_id 都会产生一条数据,一天下来,数据量是很大的(几十亿)。 然而,又要对 7个维度做成22个组合,分别求 count(distinct request_id) , count(distinct deviceid), count(distinct case when bid_response_nbr=10001 then bid_response_id else null end) ,count(distinct case when bid_response_nbr=10001 then deviceid else null end) 。 只能说,需求好无耻啊 啊 啊 啊

2、spark sql对count distinct做的优化

在 hive 中我们对count distinct 的优化往往是这样的:

代码语言:javascript
复制
--优化前
select count(distinct id) from table_a 

--优化后
select 
  count(id)
from
(
    select 
        id
    from table_a group by id
) tmp

hive往往只用一个 reduce 来处理全局聚合函数,最后导致数据倾斜;在不考虑其它因素的情况下,我们的优化方案是先 group by 再 count 。

在使用spark sql 时,貌似不用担心这个问题,因为 spark 对count distinct 做了优化:

代码语言:javascript
复制
explain 
select 
    count(distinct id),
    count(distinct name) 
from table_a

执行计划如下:

代码语言:javascript
复制
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.`id`#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.`name`#147006 else null)])
      +- *(2) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
         +- Exchange(coordinator id: 387101114) hashpartitioning(table_a.`name`#147006, table_a.`id`#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
            +- *(1) HashAggregate(keys=[table_a.`name`#147006, table_a.`id`#147007, gid#147005], functions=[])
               +- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.`name`#147006, table_a.`id`#147007, gid#147005]
                  +- *(1) Project [id#146979, name#146984]
                     +- *(1) FileScan parquet table_a

从执行计划中可以看到,在处理 count distinct 时,用 Expand 的方式,具体是怎么 expand 的呢,如下图:

expand 之后,再以id、name 为 key 进行HashAggregate 也就是 group by ,这样以来,就相当于去重了。后面直接计算count (id) 、 count(name) 就可以,把数据分而治之。 在一定程度上缓解了数据倾斜。

顺便附上 distinct 这块的部分代码,方便做对照理解:

代码语言:javascript
复制
def rewrite(a: Aggregate): Aggregate = {
    // 把所有聚合表式取出来
    val aggExpressions = a.aggregateExpressions.flatMap { e =>
      e.collect {
        case ae: AggregateExpression => ae
      }
    }
    // 抽取出含有 distinct的聚合表达式
    val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
        val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
        if (unfoldableChildren.nonEmpty) {
          // Only expand the unfoldable children
          unfoldableChildren
        } else {        
          e.aggregateFunction.children.take(1).toSet
        }
    }
    // 当有多个distinct聚合表达式时,进行expand
    if (distinctAggGroups.size > 1) {
      // 创建gid标志
      val gid = AttributeReference("gid", IntegerType, nullable = false)()
      val groupByMap = a.groupingExpressions.collect {
        case ne: NamedExpression => ne -> ne.toAttribute
        case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
      }
      val groupByAttrs = groupByMap.map(_._2)
      ....     
      }

      // 构建Expand算子
      val expand = Expand(
        regularAggProjection ++ distinctAggProjections,
        groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
        a.child)        
        .....
  }

3、spark sql 处理 grouping sets

grouping sets 、rollup 、cube 是用来处理多维分析的函数:

  • grouping sets:对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。
  • rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。
  • cube : 为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),(C),最后对全表进行group by操作。

我们来看一下 spark是怎么处理grouping sets的:

代码语言:javascript
复制
explain 
select 
  count(1) 
from  table_a 
group by  netease_user,platform 
grouping sets (netease_user,platform )

执行计划如下:

代码语言:javascript
复制
== Physical Plan ==
*(2) HashAggregate(keys=[id#147356, name#147357, spark_grouping_id#147353], functions=[count(1)])
+- Exchange(coordinator id: 1061978911) hashpartitioning(id#147356, name#147357, spark_grouping_id#147353, 4096), coordinator[target post-shuffle partition size: 67108864]
   +- *(1) HashAggregate(keys=[id#147356, name#147357, spark_grouping_id#147353], functions=[partial_count(1)])
      +- *(1) Expand [List(id#147354, null, 1), List(null, name#147355, 2)], [id#147356, name#147357, spark_grouping_id#147353]
         +- *(1) Project [id#147341 AS id#147354, name#147346 AS name#147355]
            +- *(1) FileScan parquet table_a

grouping sets 用的也是expand 的方式

4、优化过程

4.1 定位问题

了解了count distinct 和 grouping sets 的原理,已经基本能知道哪里慢了,还是来看一下执行计划:

代码语言:javascript
复制
== Physical Plan ==
+- *(4) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85], functions=[count(if ((gid#151 = 4)) tmp.`request_id`#155 else null), count(if ((gid#151 = 3)) tmp.`deviceid`#154 else null), count(if ((gid#151 = 1)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152 else null), count(if ((gid#151 = 2)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153 else null)], output=[netease_user#140, campaign_id#141, spec_id#142, app_bundle#143, render_name#144, platform#145, bid_request_num#146L, bid_request_uv#147L, offer_num#148L, offer_uv#149L, dt#150])
   +- Exchange(coordinator id: 697663456) hashpartitioning(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, 888), coordinator[target post-shuffle partition size: 536870912]
      +- *(3) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85], functions=[partial_count(if ((gid#151 = 4)) tmp.`request_id`#155 else null), partial_count(if ((gid#151 = 3)) tmp.`deviceid`#154 else null), partial_count(if ((gid#151 = 1)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152 else null), partial_count(if ((gid#151 = 2)) CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153 else null)], output=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, count#160L, count#161L, count#162L, count#163L])
         +- *(3) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151], functions=[], output=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151])
            +- Exchange(coordinator id: 92102096) hashpartitioning(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151, 888), coordinator[target post-shuffle partition size: 536870912]
               +- *(2) HashAggregate(keys=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151], functions=[], output=[dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151])
                  +- *(2) Expand [List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (bid_response_nbr#71L = 10001) THEN bid_response_id#66 ELSE null END, null, null, null, 1), List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, null, CASE WHEN (bid_response_nbr#71L = 10001) THEN deviceid#57 ELSE null END, null, null, 2), List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, null, null, deviceid#57, null, 3), List(dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, null, null, null, request_id#56, 4)], [dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`bid_response_id` ELSE CAST(NULL AS STRING) END#152, CASE WHEN (tmp.`bid_response_nbr` = CAST(10001 AS BIGINT)) THEN tmp.`deviceid` ELSE CAST(NULL AS STRING) END#153, tmp.`deviceid`#154, tmp.`request_id`#155, gid#151]
                     +- *(2) Expand [List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, null, null, null, 63), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, null, null, null, 31), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, campaign_id#39, null, null, null, null, 47), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, spec_id#40, null, null, null, 55), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, app_bundle#41, null, null, 59), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, null, render_name#37, null, 61), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, null, null, null, null, null, platform#38, 62), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, null, null, null, 15), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, spec_id#40, null, null, null, 23), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, app_bundle#41, null, null, 27), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, null, render_name#37, null, 29), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, null, null, null, null, platform#38, 30), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, null, null, null, 7), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, app_bundle#41, null, null, 11), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, null, render_name#37, null, 13), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, null, null, null, platform#38, 14), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, null, null, 3), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, null, render_name#37, null, 5), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, null, null, platform#38, 6), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, render_name#37, null, 1), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, null, platform#38, 2), List(request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, render_name#37, platform#38, 0)], [request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#93, req_netease_user#94, campaign_id#95, spec_id#96, app_bundle#97, render_name#98, platform#99, spark_grouping_id#85]
                        +- *(2) HashAggregate(keys=[dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57], functions=[], output=[request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, dt#86, req_netease_user#36, campaign_id#39, spec_id#40, app_bundle#41, render_name#37, platform#38])
                           +- Exchange(coordinator id: 1960388256) hashpartitioning(dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57, 888), coordinator[target post-shuffle partition size: 536870912]
                              +- *(1) HashAggregate(keys=[dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57], functions=[], output=[dt#80, req_netease_user#36, render_name#37, platform#38, campaign_id#39, spec_id#40, app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57])
                                 +- *(1) Project [dt#80, if (isnull(req_netease_user#60)) null else req_netease_user#60 AS req_netease_user#36, if (isnull(render_name#61)) null else render_name#61 AS render_name#37, if (isnull(platform#62)) null else platform#62 AS platform#38, if (isnull(campaign_id#68)) null else campaign_id#68 AS campaign_id#39, if (isnull(spec_id#63)) null else spec_id#63 AS spec_id#40, if (isnull(app_bundle#58)) null else app_bundle#58 AS app_bundle#41, request_id#56, bid_response_nbr#71L, bid_response_id#66, deviceid#57]
                                    +- *(1) Filter isnotnull(request_id#56)
                                       +- *(1) FileScan parquet table_a

从执行计划中,我们能知道,总共做了两次expand,第一次是 grouping sets,因为有22个组合,数据量翻了22倍;第二次是count distinct 总共四个指标,数据量在22倍的基础上,又翻了4倍。 也就是说,最终数据量翻了88倍~~~

来看看日志吧:

总共四个Job 最慢的是1和2。

Job 0 执行读 table_a 表 并过滤后,有1977861971条数据;

Job 1 经过两次expand 操作后,有174051853448条数据,数据量翻了了88倍….

然而,在 expand 的过程中 partition 的个数是不变,但是数据量却急剧膨胀了

4.2 优化方案

任务慢的主要原因,还是数据倾斜,只是这次的倾斜不是单个partition倾斜,而是整个stage倾斜。

主要有两个思路:

1、增加 expand的过程中partition 的数量

针对这次的数据来说,如果要运行不受阻,大概预估一下需要444*88 也就是39072个partition ,同时启动太多task 会造成集群资源紧张,也会导致其它任务没有资源。并且数据是 逐日增加的,总体上不好控制。

2、缩减expand 的数据量

从业务上分析:

最细粒度是request_id,其次是bid_response_id ,request_id是主键,在写逻辑时就没必要加distinct 关键字

另外对于count(distinct case when bid_response_nbr=10001 then bid_response_id else null end)、count(distinct case when bid_response_nbr=10001 then deviceid else null end) 这两个指标也可以做一些处理。

从sql结构上:

可以把计算的指标拆开,分两次计算,然后再 join。

总体的处理原则就是,让过滤掉的数据尽量的多,expand 时的数据尽量少:

代码语言:javascript
复制
--改写后的sql
with query_base as
    (
           select  
                dt,
               if(req_netease_user is null, 'null', req_netease_user) as req_netease_user,
               if(render_name is null, 'null', render_name) as render_name,
               if(platform is null,'null', platform) as platform,
               if(campaign_id is null, 'null', campaign_id) as campaign_id,
               if(spec_id is null, 'null', spec_id) as spec_id,
               if(app_bundle is null, 'null', app_bundle) as app_bundle,
               request_id,
               if(bid_response_nbr=10001,1,0) as is_nbr,
               bid_response_id,
               deviceid
           from table_a  where dt = '2019-08-11' and request_id is not null
        )

select
        request_num.netease_user,
        request_num.campaign_id,
        request_num.spec_id,
        request_num.app_bundle,
        request_num.render_name,
        request_num.platform,
        bid_request_num, 
        bid_request_uv,
        offer_num,
        offer_uv,
        request_num.dt
from
(
        select
            dt,
            nvl( req_netease_user,'all' ) as netease_user,
            nvl( campaign_id, 'all') as campaign_id,
            nvl( spec_id , 'all') as spec_id,
            nvl( app_bundle , 'all') as app_bundle,
            nvl( render_name,'all') as render_name,
            nvl( platform, 'all') as platform,
            sum(request_num) as bid_request_num,
            count(distinct deviceid) as bid_request_uv,
            count(distinct case when is_nbr=1 then deviceid else null end) as offer_uv
        from  
        (
          select
              dt,
              req_netease_user,
              campaign_id,
              spec_id,
              app_bundle,
              render_name, 
              platform,
              deviceid,
              count(request_id) as request_num,
              max(is_nbr) as is_nbr
          from query_base group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform,deviceid
        )tmp group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform
        grouping sets(
            (dt),
            (dt, req_netease_user),
            (dt, campaign_id),
            (dt, spec_id),
            (dt, app_bundle),
            (dt, render_name),
            (dt, platform),
            (dt, req_netease_user, campaign_id),
            (dt, req_netease_user, spec_id),
            (dt, req_netease_user, app_bundle),
            (dt, req_netease_user, render_name),
            (dt, req_netease_user, platform),
            (dt, req_netease_user, campaign_id, spec_id),
            (dt, req_netease_user, campaign_id, app_bundle),
            (dt, req_netease_user, campaign_id, render_name),
            (dt, req_netease_user, campaign_id, platform),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle),
            (dt, req_netease_user, campaign_id, spec_id, render_name),
            (dt, req_netease_user, campaign_id, spec_id, platform),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle, platform),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform)
        )
) request_num join
(

        select
            dt,
            nvl( req_netease_user,'all' ) as netease_user,
            nvl( campaign_id, 'all') as campaign_id,
            nvl( spec_id , 'all') as spec_id,
            nvl( app_bundle , 'all') as app_bundle,
            nvl( render_name,'all') as render_name,
            nvl( platform, 'all') as platform,
            count(distinct bid_response_id ) offer_num
        from  
        (
         select
              dt,
              req_netease_user,
              campaign_id,
              spec_id,
              app_bundle,
              render_name, 
              platform,              
              bid_response_id
          from query_base where is_nbr=1 
        )tmp group by dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform
        grouping sets(
            (dt),
            (dt, req_netease_user),
            (dt, campaign_id),
            (dt, spec_id),
            (dt, app_bundle),
            (dt, render_name),
            (dt, platform),
            (dt, req_netease_user, campaign_id),
            (dt, req_netease_user, spec_id),
            (dt, req_netease_user, app_bundle),
            (dt, req_netease_user, render_name),
            (dt, req_netease_user, platform),
            (dt, req_netease_user, campaign_id, spec_id),
            (dt, req_netease_user, campaign_id, app_bundle),
            (dt, req_netease_user, campaign_id, render_name),
            (dt, req_netease_user, campaign_id, platform),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle),
            (dt, req_netease_user, campaign_id, spec_id, render_name),
            (dt, req_netease_user, campaign_id, spec_id, platform),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle, platform),
            (dt, req_netease_user, campaign_id, spec_id, app_bundle, render_name, platform)
        )
) request_uv on  request_num.netease_user=request_uv.netease_user
and request_num.render_name=request_uv.render_name
and request_num.campaign_id=request_uv.campaign_id
and request_num.spec_id=request_uv.spec_id
and request_num.app_bundle=request_uv.app_bundle
and request_num.platform=request_uv.platform;

4.3 效果

优化后只用5分钟,棒棒哒~~

5、总结

总体来说,expand 方式适合维度小的多维分析,这是因为 expand 方式读取数据的次数只有一次,但数据会膨胀n倍。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、业务背景
  • 2、spark sql对count distinct做的优化
  • 3、spark sql 处理 grouping sets
  • 4、优化过程
    • 4.1 定位问题
      • 4.2 优化方案
        • 4.3 效果
        • 5、总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档