由前面博客我们知道了SparkSql整个解析流程如下:
optimizer 以及之后的模块都只会在触发了action操作后才会执行。优化器是用来将Resolved LogicalPlan转化为optimized LogicalPlan的。
optimizer 就是根据大佬们多年的SQL优化经验来对语法树进行优化,比如谓词下推、列值裁剪、常量累加等。优化的模式和Analyzer非常相近,Optimizer 同样继承了RuleExecutor
,并定义了很多优化的Rule:
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) :: // aggregate替换distinct
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) ::
Batch("Operator Optimizations", fixedPoint, Seq(
// Operator push down
PushProjectionThroughUnion, //谓词下推
ReorderJoin(conf),
EliminateOuterJoin(conf),
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown(conf),
ColumnPruning, //列剪裁
InferFiltersFromConstraints(conf),
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters, //合并filter
CombineLimits, //合并limit
CombineUnions,
// Constant folding and strength reduction
NullPropagation(conf), //null处理
FoldablePropagation,
OptimizeIn(conf), // 关键字in的优化,替代为InSet
ConstantFolding, //针对常量的优化,在这里会直接计算可以获得的常量
ReorderAssociativeOperator,
LikeSimplification, //表达式简化
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters(conf),
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
SimplifyCreateMapOps) ++
extendedOperatorOptimizationRules: _*) ::
Batch("Check Cartesian Products", Once,
CheckCartesianProducts(conf)) ::
Batch("Join Reorder", Once,
CostBasedJoinReorder(conf)) ::
Batch("Decimal Optimizations", fixedPoint, //精度优化
DecimalAggregates(conf)) ::
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) ::
Batch("OptimizeCodegen", Once,
OptimizeCodegen(conf)) ::
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
CollapseProject) :: Nil
}
batch的执行和analyzer一样是通过RuleExecutor的execute方法依次遍历,这里不再解析。这里有部分优化的例子
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2h6pa7g6ylkw4