前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spark sql解析过程中对tree的遍历(源码详解)

spark sql解析过程中对tree的遍历(源码详解)

作者头像
数据仓库践行者
发布2020-10-29 09:54:14
1.4K0
发布2020-10-29 09:54:14
举报
文章被收录于专栏:数据仓库践行者

静下心来读源码,给想要了解spark sql底层解析原理的小伙伴们!

【本文大纲】1、执行计划回顾2、遍历过程概述3、遍历过程详解4、思考小问题

执行计划回顾

Spark sql执行计划的生成过程:

  1. 接收 sql 语句,初步解析成 logical plan
  2. 分析上步生成的 logical plan,生成验证后的 logical plan
  3. 对分析过后的 logical plan,进行优化
  4. 对优化过后的 logical plan,生成 physical plan
  5. 根据 physical plan,生成 rdd 的程序,并且提交运行
代码语言:javascript
复制
SELECT A,B FROM TESTDATA2 WHERE A>2

结合上图,写测试用例,每一步生成的执行计划如下:

Spark sql解析会生成四种plan:

Parsed Logical Plan, Analyzed Logical Plan, Optimized Logical Plan, Physical Plan

上面这四种plan,无论是 LogicalPlan 还是 PhysicalPlan,都是通过树的形式表示。每一步都是对树进行操作,生成新的树。在这个过程中,对树的遍历非常重要。

遍历过程概述

最常用到的有 后序遍历 和 前序遍历 两种

后序遍历

TreeNode 中的 transformUp方法以及AnalysisHelper 中的 resolveOperatorsUp方法 等

这两个方法类似,以TreeNode 中的 transformUp为例:

代码语言:javascript
复制
def transformUp(rule: PartialFunction[BaseType, BaseType]): BaseType = {
  // 先遍历子节点,得到叶子节点
  val afterRuleOnChildren = mapChildren(_.transformUp(rule))
 
 //对节点执行规则
  val newNode = if (this fastEquals afterRuleOnChildren) {
    CurrentOrigin.withOrigin(origin) {
      //这里用到了PartialFunction的applyOrElse方法,用来避免undefined的情况发生。如果当前节点应用rule没有匹配的话,则返回默认的当前节点本身
      rule.applyOrElse(this, identity[BaseType])
    }
  } else {
    CurrentOrigin.withOrigin(origin) {
      rule.applyOrElse(afterRuleOnChildren, identity[BaseType])
    }
  }
  // If the transform function replaces this node with a new one, carry over the tags.
  newNode.copyTagsFrom(this)
  newNode
}

递归逻辑:

  • 递归结束条件:如果是子节点,那么使用该规则执行该节点,并且返回执行规则后的节点
  • 递归继续条件:如果有子节点,那么先根据遍历子节点的结果,生成新节点。最后在使用该规则执行新节点

前序遍历

TreeNode 中的 transformDown方法以及AnalysisHelper 中的 resolveOperatorsDown方法 等

TreeNode 中的 transformDown为例:

代码语言:javascript
复制
def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
      // 对当前节点,调用rule函数。
    val afterRule = CurrentOrigin.withOrigin(origin) {
    // 这里rule函数有可能会生成新的节点,新节点的子节点可能不一样
    rule.applyOrElse(this, identity[BaseType])
  }

  // Check if unchanged and then possibly return old copy to avoid gc churn.
 //再遍历子节点 
 if (this fastEquals afterRule) {
// 如果当前节点没有变化,则继续遍历它的子节点
    mapChildren(_.transformDown(rule))
  } else {
  
// 如果当前节点发生改变,需要对改变后的节点进行遍历
    afterRule.copyTagsFrom(this)
    afterRule.mapChildren(_.transformDown(rule))
  }
}

递归逻辑:

  • 递归结束条件:如果是叶子节点,那么使用规则对该节点操作,并且返回操作后的节点。
  • 递归继续条件:如果不是叶子节点,那么先使用该规则对该节点操作。对操作后的该节点,继续遍历其子节点,用子节点的返回结果,来构建成新的节点。

遍历中的通用方法

上面几种方法中,都用到了TreeNode中的mapChildren、mapProductIterator方法

mapChildren

mapChildren 会依次调用函数对子节点操作,根据返回的结果生成一个新的节点。

代码语言:javascript
复制
def mapChildren(f: BaseType => BaseType): BaseType = {
//如果不是叶子节点,那么会执行mapChildren(f, forceCopy = false)方法,遍历构造函数的参数。如果参数是子节点,那么递归遍历
if (containsChild.nonEmpty) {
    mapChildren(f, forceCopy = false)
  } else {
//如果是叶子节点,则返回自身节点 
    this
  }
}


private def mapChildren(
    f: BaseType => BaseType,
    forceCopy: Boolean): BaseType = {
  var changed = false

  def mapChild(child: Any): Any = child match {
    case arg: TreeNode[_] if containsChild(arg) =>
      val newChild = f(arg.asInstanceOf[BaseType])
      if (forceCopy || !(newChild fastEquals arg)) {
        changed = true
        newChild
      } else {
        arg
      }
    case tuple @ (arg1: TreeNode[_], arg2: TreeNode[_]) =>
      val newChild1 = if (containsChild(arg1)) {
        f(arg1.asInstanceOf[BaseType])
      } else {
        arg1.asInstanceOf[BaseType]
      }

      val newChild2 = if (containsChild(arg2)) {
        f(arg2.asInstanceOf[BaseType])
      } else {
        arg2.asInstanceOf[BaseType]
      }

      if (forceCopy || !(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
        changed = true
        (newChild1, newChild2)
      } else {
        tuple
      }
    case other => other
  }

// 调用了mapProductIterator方法,遍历构造函数的参数,返回新的构造参数
  val newArgs = mapProductIterator {
// 如果参数是TreeNode子类,并且是当前节点的子节点
    case arg: TreeNode[_] if containsChild(arg) =>
// 递归调用函数遍历 这里的f可能是 transformUp or transformDown
      val newChild = f(arg.asInstanceOf[BaseType])
// 如果子节点发生变化了,则更改changed标识
      if (forceCopy || !(newChild fastEquals arg)) {
        changed = true
        newChild
      } else {
        arg
      }
    case Some(arg: TreeNode[_]) if containsChild(arg) =>
      val newChild = f(arg.asInstanceOf[BaseType])
      if (forceCopy || !(newChild fastEquals arg)) {
        changed = true
        Some(newChild)
      } else {
        Some(arg)
      }
    case m: Map[_, _] => m.mapValues {
      case arg: TreeNode[_] if containsChild(arg) =>
        val newChild = f(arg.asInstanceOf[BaseType])
        if (forceCopy || !(newChild fastEquals arg)) {
          changed = true
          newChild
        } else {
          arg
        }
      case other => other
    }.view.force // `mapValues` is lazy and we need to force it to materialize
    case d: DataType => d // Avoid unpacking Structs
    case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
    case args: Iterable[_] => args.map(mapChild)
    case nonChild: AnyRef => nonChild
    case null => null
  }
 // 如果子节点发生变化,则利用新的构造参数,实例化新的节点
  if (forceCopy || changed) makeCopy(newArgs, forceCopy) else this
}

mapProductIterator

TreeNode 继承了 Product 接口,TreeNode 的子类实现了 Product 接口,所以支持访问构造方法的参数。TreeNode 的 mapProductIterator 方法,接收一个函数用来遍历当前节点的构造参数

这里有一个知识点(ClassTag用法):https://dzone.com/articles/scala-classtag-a-simple-use-case

代码语言:javascript
复制
//ClassTag用法
 def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
// protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {
  val arr = Array.ofDim[B](productArity)
  var i = 0
  while (i < arr.length) {
    arr(i) = f(productElement(i))
    i += 1
  }
  arr
}

遍历过程详解

下面以Parsed Logical Plan --> Analyzed Logical Plan的过程中 ,某个规则为例,详细跟踪一下这两种遍历方式。

分析一下当前的Parsed Logical Plan

当前sql

代码语言:javascript
复制
SELECT A,B FROM TESTDATA2 WHERE A>2

生成的Parsed Logical Plan:

代码语言:javascript
复制
== Parsed Logical Plan ==
'Project ['A, 'B]
+- 'Filter ('A > 2)
   +- 'UnresolvedRelation [TESTDATA2]

上面执行计划涉及到 三个类(Project、Filter、UnresolvedRelation):

case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)

两个参数:

  • Project-projectList: Seq['A, 'B]
  • Project-child(LogicalPlan):
  • 'Filter ('A > 2)
  • +- 'UnresolvedRelation [TESTDATA2]

Project有一个子节点Filter

case class Filter(condition: Expression, child: LogicalPlan)

两个参数:

  • Filter-condition:('A > 2)
  • Filter--child(LogicalPlan):

'UnresolvedRelation [TESTDATA2]

Filter有一个子节点UnresolvedRelation

case class UnresolvedRelation( multipartIdentifier: Seq[String])

一个参数:

  • UnresolvedRelation-multipartIdentifier: Seq[TESTDATA2]

UnresolvedRelation无子节点

Project、Filter、UnresolvedRelation与 LogicalPlan、 TreeNode的继承关系如下:

Project、Filter、UnresolvedRelation本身是Logical Plan 、TreeNode。

后序遍历(AnalysisHelper.resolveOperatorsUp)

Parsed Logical Plan 需要 通过Analyzer类中的一系列rule 转换生成Analyzed Logical Plan。

下图是Analyzer类中rule,会提前初始化在batches里:

这里的rule通过apply方法遍历Parsed Logical Plan 的每个节点,依据定好的规则生成Analyzed Logical Plan,以 ResolveHints.ResolveJoinStrategyHint为例:

主要通过AnalysisHelper 中的 resolveOperatorsUp(后序遍历的)方法:

代码语言:javascript
复制
// 入参为rule,偏函数
def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
  if (!analyzed) {
    AnalysisHelper.allowInvokingTransformsInAnalyzer {
     // 1、先遍历子节点,得到叶子节点
      val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
      
    //2、为节点执行规则
    if (self fastEquals afterRuleOnChildren) {
        CurrentOrigin.withOrigin(origin) {
    // 如果遍历后当前节点没有发生变化,对当前的plan执行rule规则
          rule.applyOrElse(self, identity[LogicalPlan])
        }
      } else {
        CurrentOrigin.withOrigin(origin) {
    // 如果遍历后 当前 节点发 生了变化,则新负值的afterRuleOnChildren执行rule规则
          rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
        }
      }
    }
  } else {
    self
  }
}

当前的节点为Project,执行Project 的 resolveOperatorsUp 方法,该方法会先遍历Project的子节点。

第一层遍历:

执行Project 的 mapChildren方法

第二层遍历:

执行Filter 的 mapChildren方法

第三层遍历:

执行UnresolvedRelation 的 mapChildren方法

由 于 UnresolvedRelation为子节点,返回节点本 身,为UnresolvedRelation执行rule。

为UnresolvedRelation节点执行ResolveJoinStrategyHint的apply方法:

代码语言:javascript
复制
// 该规则主要是针对Hint节点起作用 ,目前是UnresolvedRelation节点
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
  case h: UnresolvedHint if STRATEGY_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
    if (h.parameters.isEmpty) {
      // If there is no table alias specified, apply the hint on the entire subtree.
      ResolvedHint(h.child, createHintInfo(h.name))
    } else {
      // Otherwise, find within the subtree query plans to apply the hint.
      val relationNamesInHint = h.parameters.map {
        case tableName: String => UnresolvedAttribute.parseAttributeName(tableName)
        case tableId: UnresolvedAttribute => tableId.nameParts
        case unsupported => throw new AnalysisException("Join strategy hint parameter " +
          s"should be an identifier or string but was $unsupported (${unsupported.getClass}")
      }.toSet
      val relationsInHintWithMatch = new mutable.HashSet[Seq[String]]
      val applied = applyJoinStrategyHint(
        h.child, relationNamesInHint, relationsInHintWithMatch, h.name)

      // Filters unmatched relation identifiers in the hint
      val unmatchedIdents = relationNamesInHint -- relationsInHintWithMatch
      hintErrorHandler.hintRelationsNotFound(h.name, h.parameters, unmatchedIdents)
      applied
    }
}

这个规则主要是 对Hint节点起作用,但目前是UnresolvedRelation节点,不能匹配的上。因此通过

代码语言:javascript
复制
  rule.applyOrElse(self, identity[LogicalPlan])

之后,返回UnresolvedRelation本身。

UnresolvedRelation返回后,就会接着先后为Filter-->Project执行ResolveJoinStrategyHint规则,最后返回Project本身。

到此,整个ResolveJoinStrategyHint对Logical plan的 遍历及执行规则的 过 程 就结束了。

前序遍历(AnalysisHelper.resolveOperatorsDown)

Analyzer 中的 ExtractWindowExpressions规则

主要通过AnalysisHelper 中的 resolveOperatorsDown方法:

代码语言:javascript
复制
/** Similar to [[resolveOperatorsUp]], but does it top-down. */
def resolveOperatorsDown(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
  if (!analyzed) {
    AnalysisHelper.allowInvokingTransformsInAnalyzer {   
      val afterRule = CurrentOrigin.withOrigin(origin) {
       // 1、为当前节点执行规则
        rule.applyOrElse(self, identity[LogicalPlan])
      }

      //  2、对执行完规则后的新节点遍历迭代
      if (self fastEquals afterRule) {
        //如果执行完规则后的节点没有变化(即规则没有起到作用),则对节点遍历迭代
        mapChildren(_.resolveOperatorsDown(rule))
      } else {
       //如果执行完规则后的节点发生变化,则对新节点遍历迭代
        afterRule.mapChildren(_.resolveOperatorsDown(rule))
      }
    }
  } else {
    self
  }
}

先为Project节点执行ExtractWindowExpressions.apply方法:

Project节点模式匹配case p: LogicalPlan if !p.childrenResolved => p 返回Project节点本身

第一层遍历:

得到Project的子节点Filter,执行Filter.resolveOperatorsDown方法,先对Filter节点执行ExtractWindowExpressions.apply方法,跑一遍规则,最后由于sql没有用到window相关函数,返回Filter节点本身,开始对Filter节点进行遍历

第二层遍历:

得到Filter的子节点UnresolvedRelation,执行UnresolvedRelation.resolveOperatorsDown方法,先对UnresolvedRelation节点执行ExtractWindowExpressions.apply方法,跑一遍规则,返回UnresolvedRelation节点本身,开始对UnresolvedRelation节点进行遍历

第三层遍历:

UnresolvedRelation没有子节点,在mapChildren方法被返回。

最终 返回Project节点,ExtractWindowExpressions执行完成。

思考

什么rule适合用后序遍历?什么rule适合前序遍历?

当我们自己开发规则时,该怎么选呢?

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档