[Spark SQL] 源码解析之Analyzer

前言

由前面博客我们知道了SparkSql整个解析流程如下:

  • sqlText 经过 SqlParser 解析成 Unresolved LogicalPlan;
  • analyzer 模块结合catalog进行绑定,生成 resolved LogicalPlan;
  • optimizer 模块对 resolved LogicalPlan 进行优化,生成 optimized LogicalPlan;
  • SparkPlan 将 LogicalPlan 转换成PhysicalPlan;
  • prepareForExecution()将 PhysicalPlan 转换成可执行物理计划;
  • 使用 execute()执行可执行物理计划;

详解analyzer模块

Analyzer模块将Unresolved LogicalPlan结合元数据catalog进行绑定,最终转化为Resolved LogicalPlan。跟着代码看流程:

// 代码1
spark.sql("select * from table").show(false)
---
// 代码2
def sql(sqlText: String): DataFrame = {
    Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
  }
---
// 代码3
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

代码2中的后半段sessionState.sqlParser.parsePlan(sqlText)在上篇博客已经解析,即将sqlText通过第三方解析器antlr解析成语法树。

接着进入代码3,通过Unresolved LogicalPlan创建QueryExecution对象, 这是一个非常关键的类,analyzer 、optimizer 、SparkPlan、executedPlan等都是在该类中触发的。继续跟着代码3走:

// 代码4
def assertAnalyzed(): Unit = {
    // Analyzer is invoked outside the try block to avoid calling it again from within the
    // catch block below.
    analyzed
   ...
// 代码5
lazy val analyzed: LogicalPlan = {
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.execute(logical)
  }

最终调用analyzer的execute方法,该方法在Analyzer的父类RuleExecutor中,另外还继承了CheckAnalysis 类,用于对 plan 做一些解析,如果解析失败则抛出用户层面的错误:

class Analyzer(
    catalog: SessionCatalog,
    conf: SQLConf,
    maxIterations: Int)
  extends RuleExecutor[LogicalPlan] with CheckAnalysis {

可以看到构造器中有SessionCatalog类型的catalog,此类管理着临时表、view、函数及外部依赖元数据(如hive metastore),是analyzer进行绑定的桥梁。

继承了RuleExecutor的类(Analyzer、Optimizer)需要实现def batches: Seq[Batch]方法,在execute方法中再对此batches进行遍历执行,batches 由多个Batch构成,每个Batch由多个Rule构成,看看Batch的定义protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*),Strategy是每个Batch的执行策略即该batch被最大执行次数maxIterations ,Once和FixedPoint即执行一次和多次(默认是100次),停止执行batch的条件有两个,一是在执行maxIterations 次之前规则前后plan没有变化,二是执行次数达到maxIterations 。batch里面的所有规则都继承了Rule,在execute方法里就是遍历这些batchs,将所有的规则应用到LogicalPlan上。

接下来我们看看execute中具体是怎么做的:

def execute(plan: TreeType): TreeType = {
    var curPlan = plan
    //遍历batches
    batches.foreach { batch =>
      val batchStartPlan = curPlan
      var iteration = 1 //每个batch单独计数
      var lastPlan = curPlan //保存遍历batch之前的plan,以便和遍历后的plan进行比较,若无变化则停止执行当前batch
      var continue = true

      // Run until fix point (or the max number of iterations as specified in the strategy.
      while (continue) {
        curPlan = batch.rules.foldLeft(curPlan) { // 遍历一个batch所有的Rule,并应用到LogicalPlan上
          case (plan, rule) =>
            val startTime = System.nanoTime()
            val result = rule(plan)  // 规则应用到LogicalPlan
            val runTime = System.nanoTime() - startTime
            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)

            if (!result.fastEquals(plan)) {
              logTrace(
                s"""
                  |=== Applying Rule ${rule.ruleName} ===
                  |${sideBySide(plan.treeString, result.treeString).mkString("\n")}
                """.stripMargin)
            }

            result
        }
        iteration += 1 //对当前batch执行次数进行计数
        if (iteration > batch.strategy.maxIterations) { // 若大于了执行策略定义的次数,则停止执行此batch
          // Only log if this is a rule that is supposed to run more than once.
          if (iteration != 2) {
            val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
            if (Utils.isTesting) {
              throw new TreeNodeException(curPlan, message, null)
            } else {
              logWarning(message)
            }
          }
          continue = false
        }

        if (curPlan.fastEquals(lastPlan)) { // 若执行batch前后,plan没有变化,则停止执行此batch
          logTrace(
            s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
          continue = false
        }
        lastPlan = curPlan
      }

      if (!batchStartPlan.fastEquals(curPlan)) {
        logDebug(
          s"""
          |=== Result of Batch ${batch.name} ===
          |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
        """.stripMargin)
      } else {
        logTrace(s"Batch ${batch.name} has no effect.")
      }
    }

    curPlan
  }

主要执行步骤都在代码中进行了注释。 batch和里面的rules都是连续执行的,每执行完一个batch都判断此batch执行的次数是否达到maxIterations 和执行此batch前后是否有变化,达到maxIterations 或者执行batch前后无变化都不再执行此batch。

Analyzer的batches 如下:

lazy val batches: Seq[Batch] = Seq(
    Batch("Hints", fixedPoint,
      new ResolveHints.ResolveBroadcastHints(conf),
      ResolveHints.RemoveAllHints),
    Batch("Simple Sanity Check", Once,
      LookupFunctions),
    Batch("Substitution", fixedPoint,
      CTESubstitution,
      WindowsSubstitution,
      EliminateUnions,
      new SubstituteUnresolvedOrdinals(conf)),
    Batch("Resolution", fixedPoint,
      ResolveTableValuedFunctions ::
      ResolveRelations ::
      ResolveReferences ::
      ResolveCreateNamedStruct ::
      ResolveDeserializer ::
      ResolveNewInstance ::
      ResolveUpCast ::
      ResolveGroupingAnalytics ::
      ResolvePivot ::
      ResolveOrdinalInOrderByAndGroupBy ::
      ResolveAggAliasInGroupBy ::
      ResolveMissingReferences ::
      ExtractGenerator ::
      ResolveGenerate ::
      ResolveFunctions ::
      ResolveAliases ::
      ResolveSubquery ::
      ResolveWindowOrder ::
      ResolveWindowFrame ::
      ResolveNaturalAndUsingJoin ::
      ExtractWindowExpressions ::
      GlobalAggregates ::
      ResolveAggregateFunctions ::
      TimeWindowing ::
      ResolveInlineTables(conf) ::
      ResolveTimeZone(conf) ::
      TypeCoercion.typeCoercionRules ++
      extendedResolutionRules : _*),
    Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
    Batch("View", Once,
      AliasViewChild(conf)),
    Batch("Nondeterministic", Once,
      PullOutNondeterministic),
    Batch("UDF", Once,
      HandleNullInputsForUDF),
    Batch("FixNullability", Once,
      FixNullability),
    Batch("Subquery", Once,
      UpdateOuterReferences),
    Batch("Cleanup", fixedPoint,
      CleanupAliases)
  )

继续回到代码3(如下代码),这里通过analyzer模块和catalog绑定完后,由sparkSession、queryExecution和Row编码器构造了Dataset就返回了,并没有继续执行后面的其他模块,其他模块都是lazy的,只有出发了action操作的时候才会去执行。

def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

接下来举例子看看Analyzer模块中的规则Rule是怎么通过catalog进行绑定的。

ResolveRelations

此规则是通过catalog替换掉UnresolvedRelation:

UnresolvedRelation(tableIdentifier: TableIdentifier)

case class TableIdentifier(table: String, database: Option[String])

即可以从中获取到database和table的名字,接下来从入口方法apply看是怎么一步一步替换掉的:

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)
    }

首先执行的是plan的resolveOperators 方法,这是一个柯里化函数,跟进看看:

def resolveOperators(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
    if (!analyzed) {
      val afterRuleOnChildren = mapChildren(_.resolveOperators(rule))
      if (this fastEquals afterRuleOnChildren) {
        CurrentOrigin.withOrigin(origin) {
          rule.applyOrElse(this, identity[LogicalPlan])
        }
      } else {
        CurrentOrigin.withOrigin(origin) {
          rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
        }
      }
    } else {
      this
    }
  }

首先判断此plan是否已经被处理过,接着调用mapChildren,并且传入的是resolveOperators方法,其实就是一个递归调用,它会优先处理它的子节点,然后再处理自己,如果处理后的LogicalPlan和当前的相等就说明他没有子节点了,则处理它自己,反之处理返回的plan。

回到前面看看这个Rule是怎么应用起来的:

case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)

先看第二种情况若为UnresolvedRelation,则调用resolveRelation方法进行解析:

def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {  
                                    //不是这种情况 select * from parquet.`/path/to/query`
      case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) => 
        val defaultDatabase = AnalysisContext.get.defaultDatabase // 获取默认database
        val relation = lookupTableFromCatalog(u, defaultDatabase)
        resolveRelation(relation)
      // The view's child should be a logical plan parsed from the `desc.viewText`, the variable
      // `viewText` should be defined, or else we throw an error on the generation of the View
      // operator.
      case view @ View(desc, _, child) if !child.resolved =>
        // Resolve all the UnresolvedRelations and Views in the child.
        val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) {
          if (AnalysisContext.get.nestedViewDepth > conf.maxNestedViewDepth) {
            view.failAnalysis(s"The depth of view ${view.desc.identifier} exceeds the maximum " +
              s"view resolution depth (${conf.maxNestedViewDepth}). Analysis is aborted to " +
              "avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
              "aroud this.")
          }
          execute(child)
        }
        view.copy(child = newChild)
      case p @ SubqueryAlias(_, view: View) =>
        val newChild = resolveRelation(view)
        p.copy(child = newChild)
      case _ => plan
    }

这里第一次进来肯定是先进入第一个case,然后会调用lookupTableFromCatalog方法从catalog中找关系,此方法最终调用了SessionCatalog的lookupRelation方法:

def lookupRelation(name: TableIdentifier): LogicalPlan = {
    synchronized {
      val db = formatDatabaseName(name.database.getOrElse(currentDb))
      val table = formatTableName(name.table)
      if (db == globalTempViewManager.database) {
        globalTempViewManager.get(table).map { viewDef =>
          SubqueryAlias(table, viewDef)
        }.getOrElse(throw new NoSuchTableException(db, table))
      } else if (name.database.isDefined || !tempTables.contains(table)) {
        val metadata = externalCatalog.getTable(db, table)
        if (metadata.tableType == CatalogTableType.VIEW) {
          val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
          // The relation is a view, so we wrap the relation by:
          // 1. Add a [[View]] operator over the relation to keep track of the view desc;
          // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
          val child = View(
            desc = metadata,
            output = metadata.schema.toAttributes,
            child = parser.parsePlan(viewText))
          SubqueryAlias(table, child)
        } else {
          val tableRelation = CatalogRelation(
            metadata,
            // we assume all the columns are nullable.
            metadata.dataSchema.asNullable.toAttributes,
            metadata.partitionSchema.asNullable.toAttributes)
          SubqueryAlias(table, tableRelation)
        }
      } else {
        SubqueryAlias(table, tempTables(table))
      }
    }
  }
  • 若db等于globalTempViewManager.database,globalTempViewManager维护了一个全局viewName和其元数据LogicalPlan 的映射: val viewDefinitions = new mutable.HashMap[String, LogicalPlan]则直接从globalTempViewManager获取并返回。
  • 若database已定义,且临时表中未有此table: 从externalCatalog(如hive)中获取table对应的元数据信息metadata:CatalogTable,此对象包含了table对应的类型(table(内部还是外部表),view)、存储格式、字段shema信息等:
    • 若返回的table是View类型则构造View对象(包括将viewText通过parser模块解析成语法树),并传入构造一个SubqueryAlias返回
    • 说明此table名对应的就是一个如hive的table表,通过metadata、数据和分区列的schema构造了CatalogRelation,并以此tableRelation构造SubqueryAlias返回。这里就可以看出从一个未绑定的UnresolvedRelation 到通过catalog替换的过程。
  • 说明是个session级别的临时表,从tempTables获取到包含元数据信息的LogicalPlan 并构造SubqueryAlias返回。

再次回到resolveRelation方法:

def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
      case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
        val defaultDatabase = AnalysisContext.get.defaultDatabase
        val relation = lookupTableFromCatalog(u, defaultDatabase)
        resolveRelation(relation)
      // The view's child should be a logical plan parsed from the `desc.viewText`, the variable
      // `viewText` should be defined, or else we throw an error on the generation of the View
      // operator.
      case view @ View(desc, _, child) if !child.resolved =>
        // Resolve all the UnresolvedRelations and Views in the child.
        val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) {
          if (AnalysisContext.get.nestedViewDepth > conf.maxNestedViewDepth) {
            view.failAnalysis(s"The depth of view ${view.desc.identifier} exceeds the maximum " +
              s"view resolution depth (${conf.maxNestedViewDepth}). Analysis is aborted to " +
              "avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " +
              "aroud this.")
          }
          execute(child)
        }
        view.copy(child = newChild)
      case p @ SubqueryAlias(_, view: View) =>
        val newChild = resolveRelation(view)
        p.copy(child = newChild)
      case _ => plan
    }

经过lookupTableFromCatalog方法后,又调用了resolveRelation方法本身:

  • case UnresolvedRelation上面讲过了
  • case View,通过上面的解析可知这可能是外部catalog(如hive)的View,其child是viewText被parser模块解析完的Unresolved LogicalPlan,调用execute方法进行analyze。简单的说若是View,则会获取viewText重走parser和analyzer模块。
  • case SubqueryAlias(_, view: View):为view调用resolveRelation方法
  • case _ :若是其他情况,直接返回plan

总之经过resolveRelation方法之后,返回的plan是已经和实际元数据绑定好的plan,可能是从globalTempViewManager直接获取的,可能是从tempTables直接获取,也可能是从externalCatalog获取的元数据。

再回到最初的apply方法:

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
      case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
        EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
          case v: View =>
            u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
          case other => i.copy(table = other)
        }
      case u: UnresolvedRelation => resolveRelation(u)
    }

这里第二种情况已经分析完,再看看第一种情况,若plan是InsertIntoTable类型并且其对应的table还未绑定,则调用lookupTableFromCatalog方法与catalog进行analyze之后应用到Rule EliminateSubqueryAliases:

object EliminateSubqueryAliases extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    case SubqueryAlias(_, child) => child
  }
}

遍历子节点有两种方式,transformDown(默认,前序遍历)、transformUp 后续遍历。 UnresolvedRelation解析后可能会是SubqueryAlias,真正有用的是其child(CatalogRelation),一旦解析完就将其删除掉保留child。 到这里Rule ResolveRelations就解析完了,其他就不再一一列举了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏守候书阁

编写自己的代码库(javascript常用实例的实现与封装--续)

这个系列的上一篇文章(编写自己的代码库(javascript常用实例的实现与封装))总结了34个常见的操作。但是在开发中,常见的实例又何止这么多个,经过这些日子...

15330
来自专栏pangguoming

完全掌握Android Data Binding

编辑推荐:稀土掘金,这是一个针对技术开发者的一个应用,你可以在掘金上获取最新最优质的技术干货,不仅仅是Android知识、前端、后端以至于产品和设计都有涉猎,想...

63570
来自专栏三好码农的三亩自留地

Java动态代理-实战

说动态代理之前,要先搞明白什么是代理,代理的字面意思已经很容易理解了,我们这里撇开其他的解释,我们只谈设计模式中的代理模式

31220
来自专栏微信公众号:Java团长

Java类加载器详解(下)

这个类中定义了一个加密和解密的算法,很简单的,就是将字节和oxff异或一下即可,而且这个算法是加密和解密的都可以用,很是神奇呀!

15930
来自专栏草根专栏

.NET Core/.NET之Stream简介

之前写了一篇C#装饰模式的文章提到了.NET Core的Stream, 所以这里尽量把Stream介绍全点. (都是书上的内容) .NET Core/.NET的...

41640
来自专栏iOS开发

iOS开发之 Method Swizzling 深入浅出

如果产品经理突然说:"在所有页面添加统计功能,也就是用户进入这个页面就统计一次"。我们会想到下面的一些方法:

48870
来自专栏菩提树下的杨过

rpc框架之 thrift 学习 2 - 基本概念

thrift的基本构架: ? 上图源自:http://jnb.ociweb.com/jnb/jnbJun2009.html 底层Underlying I/O以上...

26970
来自专栏大内老A

如何解决EnterLib异常处理框架最大的局限——基于异常"类型"的异常处理策略

个人觉得EnterLib的EHAB(Exception Handling Application Block)是一个不错的异常处理框架,借助于EHAB,我们可以...

21650
来自专栏Alice

demo3同通讯录展示的方式分组排序

按A-Z顺序分组展示 有些项目中会需要这样的需求。形成类似于上述的界面。类似于通讯录里边的排序。实现的效果:所有的数据展示的时候,能够分组展示。顺序按照A-Z的...

23490
来自专栏mini188

java中的锁

java中有哪些锁 这个问题在我看了一遍<java并发编程>后尽然无法回答,说明自己对于锁的概念了解的不够。于是再次翻看了一下书里的内容,突然有点打开脑门的感觉...

50490

扫码关注云+社区

领取腾讯云代金券