前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >sparksql源码系列 | 一文搞懂Partitioning源码体系(spark3.2)

sparksql源码系列 | 一文搞懂Partitioning源码体系(spark3.2)

作者头像
数据仓库践行者
发布2022-06-09 21:36:10
8771
发布2022-06-09 21:36:10
举报
文章被收录于专栏:数据仓库践行者

这篇文章主要介绍sparksql中Partitioning的源码体系,和上篇 sparksql源码系列 | 一文搞懂Distribution源码体系(spark3.2)一样, Partitioning也是我们理解Physical Plan、executed Plan、shuffle、SparkSQL的AQE机制等的一个比较基础的知识点。

Partitioning定义了一个物理算子输出数据的分区方式,具体包括子Partitioning之间、目标Partitioning和Distribution之间的关系。

它用在什么地方呢?

每个physical operatior实现了outputPartitioning接口,以获得一个Partitioning的实例,用于表示 operator输出数据满足的分布情况

类的依赖关系图

  • UnknownPartitioning:不进行分区
  • SinglePartition:单分区
  • RoundRobinPartitioning:在1-numPartitions范围内轮询式分区
  • BroadcastPartitioning:广播分区
  • HashPartitioning:基于哈希的分区方式
  • RangePartitioning:基于范围的分区方式
  • PartitioningCollection:分区方式的集合,描述物理算子的输出
  • DataSourcePartitioning:V2 DataSource的分区方式

Partitioning接口定义如下:

代码语言:javascript
复制
trait Partitioning {
 //该sparkPlan输出RDD的分区数目
  val numPartitions: Int
 //当前的partitioning操作能否得到所需的数据分布,当不满足时返回false,对数据进行重新组织
 /** 需满足两个条件:
  * 1、分区数numPartitions要相等 
  * 2、satisfies0方法返回true,satisfies0方法中写了和Distribution的关系
  **/
  final def satisfies(required: Distribution): Boolean = {
    required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
  }

  /**
   * 1、如果requiredChildDistribution为UnspecifiedDistribution,则说明对子节点的分布没有要求,返回true
   * 2、如果requiredChildDistribution为AllTuples,则只要numPartitions == 1,返回true
   * 3、其他情况,返回false
   **/
  protected def satisfies0(required: Distribution): Boolean = required match {
    case UnspecifiedDistribution => true
    case AllTuples => numPartitions == 1
    case _ => false
  }
}

numPartitions:指定该sparkplan输出的rdd分区数目。

satisfies&satisfies0:当前的partitioning操作能否得到所需的数据分布(required)。当不满足时,一般需要进行repartition操作,对数据进行重组织。做法就是添加exchange节点

Partitioning与Distribution关系理解

1、sparkplan定义了requiredChildDistribution接口,以获得一个Distribution的实例,用于表示 operator对其input数据(child节点的输出数据)分布情况的要求

2、sparkplan定义了outputPartitioning接口,以获得一个Partitioning的实例,用于表示 operator输出数据满足的分布情况

3、Distribution定义了createPartitioning接口,用来定义该distribution对应哪种Partitioning。

代码语言:javascript
复制
sealed trait Distribution {
 //分区数
  def requiredNumPartitions: Option[Int]

 //为Distribution创建默认分区,该分区可以满足此分布,同时匹配给定数量的分区。
  def createPartitioning(numPartitions: Int): Partitioning
}
代码语言:javascript
复制

4、Partitioning定义了satisfies接口,用来判断当前的partitioning操作能否得到所需的数据分布,当不满足时返回false。

总结一下

SparkPlan对输入数据的分布(Distribution)情况有着一定的要求,比如HashAggregateExec类型,要求输入数据key值按照hash方式分区,如果输入数据的分布无法满足(child.outputPartitioning.satisfies(requiredChildDistributions) )当前节点的处理逻辑时,就需要添加一些shuffle操作来达到要求,体现在物理算子树上就是加Exchange节点。

决定要不要添加Exchange节点,主要是靠子节点的outputPartitioning(Partitioning)是否satisfies当前节点requiredChildDistributions(Distribution)来决定。

以上

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 类的依赖关系图
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档