前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark sql规则执行器RuleExecutor(源码解析)

Spark sql规则执行器RuleExecutor(源码解析)

作者头像
数据仓库践行者
发布2020-10-29 09:54:52
1.4K0
发布2020-10-29 09:54:52
举报
文章被收录于专栏:数据仓库践行者

静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!

【本文大纲】1、前言2、Strategy3、Batch(包含一个或多个Rule及一个策略)4、batches: Seq[Batch](Batch队列)5、execute(核心方法)

前言

Spark sql通过Analyzer中 定义的rule把Parsed Logical Plan解析成 Analyzed Logical Plan;通过Optimizer定义的rule把 Analyzed Logical Plan 优化成 Optimized Logical Plan 。

下图是RuleExecutor类 的继承关系,Analyzer、Optimizer都继承了RuleExecutor。

Analyzer、Optimizer定义了一系列 rule,而RuleExecutor 定义了一个 rules 执行框架,即怎么把一批批规则应用在一个 plan 上得到一个新的 plan。

规则是怎么执行的 ?

RuleExecutor包含以下主要对象

代码语言:javascript
复制
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {

abstract class Strategy {
  def maxIterations: Int
  def errorOnExceed: Boolean = false
  def maxIterationsSetting: String = null
}

case object Once extends Strategy { val maxIterations = 1 }

case class FixedPoint(
  override val maxIterations: Int,
  override val errorOnExceed: Boolean = false,
  override val maxIterationsSetting: String = null) extends Strategy

protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)

protected def batches: Seq[Batch]

Strategy

Strategy 定义了Rule处理的迭代策略,有些Rule只用执行一次,有些需要多次直到达到某种效果。

代码语言:javascript
复制
abstract class Strategy {

  //表示 Batch 最大执行次数,有可能是一次或多次。如果执行了 maxIterations 次之前达到收敛点,就停止,不再继续执行 Batch
  def maxIterations: Int

  //决定如果执行次超过了最大迭代次数是否抛出异常
  def errorOnExceed: Boolean = false

  //是大执行次数可以在配置文件里配制,这里是获取配制文件中key。
  def maxIterationsSetting: String = null
}

Strategy有两个实现类 :Once、FixedPoint

Once

once定义了只运行一次的规则,即maxIterations = 1的 Strategy

代码语言:javascript
复制
case object Once extends Strategy { val maxIterations = 1 }

FixedPoint

fixedPoint 定义多于1次的迭代策略,maxIterations 通过配置文件获取。

代码语言:javascript
复制
case class FixedPoint(
  override val maxIterations: Int,
  override val errorOnExceed: Boolean = false,
  override val maxIterationsSetting: String = null) extends Strategy

Analyzer 和 Optimizer 中分 别定义自己的fixedPoint,最大迭代次数,分别从

spark.sql.analyzer.maxIterations or spark.sql.optimizer.maxIterations 这两个配置参数里获取,默认100

Batch(包含一个或多个Rule及一个策略)

Batch 用来表示一组同类的规则。

每个Batch的Rule使用相同的策略(执行一次 or 达到fixedPoint),便于管理

代码语言:javascript
复制
/** A batch of rules. */
protected case class Batch(
name: String, //这组同类规则的名称
strategy: Strategy,//策略
rules: Rule[TreeType]* //具体的规则 
)
 

Analyzer 和 Optimizer 中分 别定义自己的batch,比如Analyzer中 定义的【Hints】,策略用的是FixedPoint,【Hints】中包含了两个与处理【hint】相关的rule:

ResolveHints.ResolveJoinStrategyHints

ResolveHints.ResolveCoalesceHints

batches: Seq[Batch](Batch队列)

RuleExecutor 包含了一个 protected def batches: Seq[Batch] 方法,用来获取一系列 Batch(Batch队列),这些 Batch 都会在 execute 中执行。所有继承 RuleExecutor(Analyzer 和 Optimizer)都必须实现该方法,提供自己的 Seq[Batch]。

Analyzer 和 Optimizer 中 提供各自己的 batches:

Optimizer 中的batches略显复杂,Optimizer定义了 三种batches:defaultBatches、excludedRules 、 nonExcludableRules

最终要被执行的batches为:defaultBatches - (excludedRules - nonExcludableRules)

execute(核心方法)

execute方法遍历batches中的每个Batch,再用Batch中的每个Rule处理plan。

while (continue) 的终止条件:

达到最大迭代次数maxIterations 或者 当前plan和last plan相比没有变化

执行流程

源码详解

代码语言:javascript
复制
//传入参数plan是当前的执行计划
def execute(plan: TreeType): TreeType = {
  var curPlan = plan
  val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
  val planChangeLogger = new PlanChangeLogger()
  val tracker: Option[QueryPlanningTracker] = QueryPlanningTracker.get
  val beforeMetrics = RuleExecutor.getCurrentMetrics()

  // Run the structural integrity checker against the initial input
  if (!isPlanIntegral(plan)) {
    val message = "The structural integrity of the input plan is broken in " +
      s"${this.getClass.getName.stripSuffix("$")}."
    throw new TreeNodeException(plan, message, null)
  }

  //遍历batches,取出batch
  batches.foreach { batch =>
    //针对每个batch处理
    val batchStartPlan = curPlan
    var iteration = 1 
    var lastPlan = curPlan
    var continue = true
    while (continue) {
      //foldLeft语法,形如A.foldLeft(B){(z,i)=>{dosomething() return C}}
      //curPlan是batch迭代的初始值
      //case(plan,rule) 中plan是batch中某个rule执行后返回的plan,继续投入batch中下个rule;rule是指batch中的某个rule对象
      //整个foldLeft最后返回当前batch执行完之后生成的plan
      curPlan = batch.rules.foldLeft(curPlan) {
        case (plan, rule) =>
          val startTime = System.nanoTime()
          val result = rule(plan)  //调用rule的apply方法,执行规则       
          val runTime = System.nanoTime() - startTime
          val effective = !result.fastEquals(plan)

          if (effective) {
            queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
            queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
            planChangeLogger.logRule(rule.ruleName, plan, result)
          }
          queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
          queryExecutionMetrics.incNumExecution(rule.ruleName)

          // Record timing information using QueryPlanningTracker
          tracker.foreach(_.recordRuleInvocation(rule.ruleName, runTime, effective))

          // Run the structural integrity checker against the plan after each rule.
          if (!isPlanIntegral(result)) {
            val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
              "the structural integrity of the plan is broken."
            throw new TreeNodeException(result, message, null)
          }

          result
      }
      iteration += 1 //迭代次数+1
      if (iteration > batch.strategy.maxIterations) {
         //处理迭代次数大于策略中最大迭代次的情况
        
        if (iteration != 2) {
         //这个处理策略最大迭代次数不是一次,并且迭代次数超过了maxIterations的情况
          val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
            "."
          } else {
            s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
          }
          val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
            s"$endingMsg"
          if (Utils.isTesting || batch.strategy.errorOnExceed) {
           //如果errorOnExceed为true则抛出异常 
            throw new TreeNodeException(curPlan, message, null)
          } else {
            //只给警告日志
            logWarning(message)
          }
        }
        // Check idempotence for Once batches.
        if (batch.strategy == Once &&
          Utils.isTesting && !blacklistedOnceBatches.contains(batch.name)) {
          checkBatchIdempotence(batch, curPlan)
        }
        //对于达到策略设置数量,continue置false,迭代将会结束
        continue = false
      }
    //如果迭代次数没达到maxIterations,但是当前plan和上次plan完全一样的话,也会把 continue 置为 false,停止迭代
      if (curPlan.fastEquals(lastPlan)) {
        logTrace(
          s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
        continue = false
      }
      lastPlan = curPlan
    }

    planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
  }
  planChangeLogger.logMetrics(RuleExecutor.getCurrentMetrics() - beforeMetrics)

  curPlan
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档