前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark sql 生成PhysicalPlan(源码详解)

Spark sql 生成PhysicalPlan(源码详解)

作者头像
数据仓库践行者
发布2020-11-09 14:24:24
9400
发布2020-11-09 14:24:24
举报

静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!

生成PhysicalPlan的执行类

生成physical plan 的入口:

代码语言:javascript
复制
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())

QueryExecution.createSparkPlan -> (SparkPlanner.plan)SparkStrategies.plan ->QueryPlanner.plan

涉及到执行的类的继承关系如下:

1、SparkPlanner把Optimizer LogicalPlan转换为SparkPlan列表。SparkPlanner主要是通过物理计划策略(Strategy)作用于Optimizer LogicalPlan上,从而生成SparkPlan列表即Iterator[PhysicalPlan]。

SparkPlanner中实现了strategies: Seq[Strategy] 方法,用来存放一系列的Strategy,类似生成逻辑执行计划中的 batches。

2、SparkStrategy主要定义了各种Strategy的具体实现,类似Analyzer 和 Optimizer 中定义的rule。

3、QueryPlanner是生成物理执行计划的基类,QueryPlanner 是SparkPlanner的基类,定义了一系列的关键点,如Strategy,planLater和apply。

QueryPlanner 也定义了生成物理执行计划的核心方法:

代码语言:javascript
复制
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
   // Collect physical plan candidates.
  val candidates = strategies.iterator.flatMap(_(plan))
  // The candidates may contain placeholders marked as [[planLater]],
  // so try to replace them by their child plans.
  val plans = candidates.flatMap { candidate =>
    val placeholders = collectPlaceholders(candidate)
    if (placeholders.isEmpty) {
      // Take the candidate as is because it does not contain placeholders.
      Iterator(candidate)
    } else {
      // Plan the logical plan marked as [[planLater]] and replace the placeholders.
      placeholders.iterator.foldLeft(Iterator(candidate)) {
        // candidatesWithPlaceholders初始值-->Iterator(candidate)
        // (placeholder, logicalPlan)-->placeholders--> Seq[(PhysicalPlan, LogicalPlan)]
        case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
          // Plan the logical plan for the placeholder.
          val childPlans = this.plan(logicalPlan)
          candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
            childPlans.map { childPlan => {            
              // Replace the placeholder by the child plan
              candidateWithPlaceholders.transformUp {
                case p if p.eq(placeholder) => childPlan
              }
            }
            }
          }
      }
    }
  }

  val pruned = prunePlans(plans)
  assert(pruned.hasNext, s"No plan for $plan")
  pruned
}

生成过程

代码语言:javascript
复制
SELECT A,B FROM TESTDATA2 WHERE A>2

该sql生成优化后的逻辑执行计划(Optimized Logical Plan)为:

代码语言:javascript
复制
Filter (A#3 > 2)
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
   +- ExternalRDD [obj#2]

QueryExecution.createSparkPlan

代码语言:javascript
复制
lazy val sparkPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) {
  QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
}

def createSparkPlan(
    sparkSession: SparkSession,
    planner: SparkPlanner,
    plan: LogicalPlan): SparkPlan = {
  // TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
  //       but we will implement to choose the best plan.
  planner.plan(ReturnAnswer(plan)).next()
}

ReturnAnswer是一个特殊的LogicalPlan节点,放在Optimized Logical Plan的顶层

ReturnAnswer(plan),加了ReturnAnswer节点后:

代码语言:javascript
复制
ReturnAnswer
+- Filter (A#3 > 2)
   +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
      +- ExternalRDD [obj#2]

QueryPlanner.plan

自顶向下为每一层生成对应的物理执行计划:

整个Logicalplan通过val childPlans = this.plan(logicalPlan) 来递归操作子节点,candidates生成过程如下:

把candidates中PlanLater替换为已经生成好的物理执行计划:

prunePlans

prunePlans用于剪去不合适的plan,从而避免组合爆炸,但是现在还没有实现。

直接返回生成的物理执行计划。

代码语言:javascript
复制
override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = {
  // TODO: We will need to prune bad plans when we improve plan space exploration
  //       to prevent combinatorial explosion.
  plans
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档