静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!
生成PhysicalPlan的执行类
生成physical plan 的入口:
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
QueryExecution.createSparkPlan -> (SparkPlanner.plan)SparkStrategies.plan ->QueryPlanner.plan
涉及到执行的类的继承关系如下:
1、SparkPlanner把Optimizer LogicalPlan转换为SparkPlan列表。SparkPlanner主要是通过物理计划策略(Strategy)作用于Optimizer LogicalPlan上,从而生成SparkPlan列表即Iterator[PhysicalPlan]。
SparkPlanner中实现了strategies: Seq[Strategy] 方法,用来存放一系列的Strategy,类似生成逻辑执行计划中的 batches。
2、SparkStrategy主要定义了各种Strategy的具体实现,类似Analyzer 和 Optimizer 中定义的rule。
3、QueryPlanner是生成物理执行计划的基类,QueryPlanner 是SparkPlanner的基类,定义了一系列的关键点,如Strategy,planLater和apply。
QueryPlanner 也定义了生成物理执行计划的核心方法:
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]],
// so try to replace them by their child plans.
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) {
// Take the candidate as is because it does not contain placeholders.
Iterator(candidate)
} else {
// Plan the logical plan marked as [[planLater]] and replace the placeholders.
placeholders.iterator.foldLeft(Iterator(candidate)) {
// candidatesWithPlaceholders初始值-->Iterator(candidate)
// (placeholder, logicalPlan)-->placeholders--> Seq[(PhysicalPlan, LogicalPlan)]
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan => {
// Replace the placeholder by the child plan
candidateWithPlaceholders.transformUp {
case p if p.eq(placeholder) => childPlan
}
}
}
}
}
}
}
val pruned = prunePlans(plans)
assert(pruned.hasNext, s"No plan for $plan")
pruned
}
生成过程
SELECT A,B FROM TESTDATA2 WHERE A>2
该sql生成优化后的逻辑执行计划(Optimized Logical Plan)为:
Filter (A#3 > 2)
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
+- ExternalRDD [obj#2]
QueryExecution.createSparkPlan
lazy val sparkPlan: SparkPlan = executePhase(QueryPlanningTracker.PLANNING) {
QueryExecution.createSparkPlan(sparkSession, planner, optimizedPlan.clone())
}
def createSparkPlan(
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan = {
// TODO: We use next(), i.e. take the first plan returned by the planner, here for now,
// but we will implement to choose the best plan.
planner.plan(ReturnAnswer(plan)).next()
}
ReturnAnswer是一个特殊的LogicalPlan节点,放在Optimized Logical Plan的顶层
ReturnAnswer(plan),加了ReturnAnswer节点后:
ReturnAnswer
+- Filter (A#3 > 2)
+- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#3, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#4]
+- ExternalRDD [obj#2]
QueryPlanner.plan
自顶向下为每一层生成对应的物理执行计划:
整个Logicalplan通过val childPlans = this.plan(logicalPlan) 来递归操作子节点,candidates生成过程如下:
把candidates中PlanLater替换为已经生成好的物理执行计划:
prunePlans
prunePlans用于剪去不合适的plan,从而避免组合爆炸,但是现在还没有实现。
直接返回生成的物理执行计划。
override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = {
// TODO: We will need to prune bad plans when we improve plan space exploration
// to prevent combinatorial explosion.
plans
}