前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Spark重点难点08】Spark3.0中的AQE和DPP小总结

【Spark重点难点08】Spark3.0中的AQE和DPP小总结

作者头像
王知无-import_bigdata
发布2021-12-22 14:01:27
2.3K0
发布2021-12-22 14:01:27
举报

Spark重点难点系列:

前言

Spark3.0版本的发布已经很长时间了,3.0版本增加了很多令人兴奋的新特性。

包括动态分区剪裁(Dynamic Partition Pruning)、自适应查询执行(Adaptive Query Execution)、加速器感知调度(Accelerator-aware Scheduling)、支持 Catalog 的数据源API(Data Source API with Catalog Supports)、SparkR 中的向量化(Vectorization in SparkR)、支持 Hadoop 3/JDK 11/Scala 2.12 等等。

这里面最重要的两个特性分别是:

  • AQE(Adaptive Query Execution,自适应查询执行)
  • DPP(Dynamic Partition Pruning,动态分区剪裁)

我们分别就分别就这两个特性进行一下讲解。

AQE(Adaptive Query Execution,自适应查询执行)

AQE是Spark SQL的一种动态优化机制,是对查询执行计划的优化。

我们可以设置参数spark.sql.adaptive.enabled为true来开启AQE,在Spark 3.0中默认是false。

在运行时,AQE会结合Shuffle Map阶段执行完毕后的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

在介绍AQE之前我们先讲解两个优化策略:

  • RBO(Rule Based Optimization,基于规则的优化)。它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。也就是说,启发式的优化实际上算是一种「经验主义」。
  • CBO(Cost Based Optimization,基于成本的优化)。CBO是一种基于数据统计信息例如数据量、数据分布来选择代价最小的优化策略的方式。

RBO相对于CBO而言要成熟得多,常用的规则都基于经验制定,可以覆盖大部分查询场景,并且方便扩展。其缺点则是不够灵活,对待相似的问题和场景都使用同一类解决方案,忽略了数据本身的信息。

Spark在2.2版本中推出了CBO,主要就是为了解决RBO「经验主义」的弊端。

AQE的三大特性包括:Join策略调整分区自动合并自动倾斜处理

Join策略调整

关于Spark支持的Join策略,我们在之前的文章中做过详细介绍了:

Spark 支持的许多 Join 策略中,Broadcast Hash Join通常是性能最好的,前提是参加 join 的一张表的数据能够装入内存。由于这个原因,当 Spark 估计参加 join 的表数据量小于广播大小的阈值时,其会将 Join 策略调整为 Broadcast Hash Join。但是,很多情况都可能导致这种大小估计出错——例如存在一个非常有选择性的过滤器。

由于AQE可以精确的统计上游数据,因此可以解决该问题。比如下面这个例子,右表的实际大小为15M,而在该场景下,经过filter过滤后,实际参与join的数据大小为8M,小于了默认broadcast阈值10M,应该被广播。

在我们执行过程中转化为BHJ的同时,我们甚至可以将传统shuffle优化为本地shuffle(例如shuffle读在mapper而不是基于reducer)来减小网络开销。

分区自动合并

在我们处理的数据量级非常大时,shuffle通常来说是最影响性能的。因为shuffle是一个非常耗时的算子,它需要通过网络移动数据,分发给下游算子。

在shuffle中,partition的数量十分关键。partition的最佳数量取决于数据,而数据大小在不同的query不同stage都会有很大的差异,所以很难去确定一个具体的数目。

在这部分,有两个非常重要的参数用来控制目标分区的大小:

  • spark.sql.adaptive.advisoryPartitionSizeInBytes,分区合并后的推荐尺寸
  • spark.sql.adaptive.coalescePartitions.minPartitionNum,分区合并后最小分区数

为了解决该问题,我们在最开始设置相对较大的shuffle partition个数,通过执行过程中shuffle文件的数据来合并相邻的小partitions。例如,假设我们执行SELECT max(i) FROM table GROUP BY j,表table只有2个partition并且数据量非常小。我们将初始shuffle partition设为5,因此在分组后会出现5个partitions。若不进行AQE优化,会产生5个tasks来做聚合结果,事实上有3个partitions数据量是非常小的。

这种情况下,AQE生效后只会生成3个reduce task。

自动倾斜处理

Spark Join操作如果出现某个key的数据倾斜问题,那么基本上就是这个任务的性能杀手了。在AQE之前,用户没法自动处理Join中遇到的这个棘手问题,需要借助外部手动收集数据统计信息,并做额外的加盐,分批处理数据等相对繁琐的方法来应对数据倾斜问题。

AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。

我们可以看下这个场景,Table A join Table B,其中Table A的partition A0数据远大于其他分区。

AQE会将partition A0切分成2个子分区,并且让他们独自和Table B的partition B0进行join。

如果不做这个优化,SMJ将会产生4个tasks并且其中一个执行时间远大于其他。经优化,这个join将会有5个tasks,但每个task执行耗时差不多相同,因此个整个查询带来了更好的性能。

关于如何定位这些倾斜的分区,主要靠下面三个参数:

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor,判定倾斜的倾斜因子
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes,判定倾斜的最低阈值
  • spark.sql.adaptive.advisoryPartitionSizeInBytes,倾斜数据分区拆分,小数据分区合并优化时,建议的分区大小(以字节为单位)

DPP(Dynamic Partition Pruning,动态分区剪裁)

所谓的动态分区裁剪就是基于运行时(run time)推断出来的信息来进一步进行分区裁剪,从而减少事实表中数据的扫描量、降低 I/O 开销,提升执行性能。

我们在进行事实表和维度表的Join过程中,把事实表中的无效数据进行过滤,例如:

代码语言:javascript
复制
SELECT * FROM dim 
JOIN fact 
ON (dim.col = fact.col) 
WHERE dim.col = 'dummy'

当SQL满足DPP的要求后,会根据关联关系dim.col = fact.col,通过维度表的列传导到事实表的col字段,只扫描事实表中满足条件的部分数据,就可以做到减少数据扫描量,提升I/O效率。

但是使用DPP的前提条件比较苛刻,需要满足以下条件:

  1. 事实表必须是分区表
  2. 只支持等值Join
  3. 维度表过滤之后的数据必须小于广播阈值:spark.sql.autoBroadcastJoinThreshold

以上就是Spark3.0中最重要的两个特性AQEDPP了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • AQE(Adaptive Query Execution,自适应查询执行)
    • Join策略调整
      • 分区自动合并
        • 自动倾斜处理
        • DPP(Dynamic Partition Pruning,动态分区剪裁)
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档