首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink Table的select操作

聊聊flink Table的select操作

作者头像
code4it
发布2019-01-28 10:50:28
8760
发布2019-01-28 10:50:28
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink Table的select操作

Table.select

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......

  def select(fields: String): Table = {
    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    //get the correct expression for AggFunctionCall
    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, tableEnv))
    select(withResolvedAggFunctionCall: _*)
  }

  def replaceAggFunctionCall(field: Expression, tableEnv: TableEnvironment): Expression = {
    field match {
      case l: LeafExpression => l

      case u: UnaryExpression =>
        val c = replaceAggFunctionCall(u.child, tableEnv)
        u.makeCopy(Array(c))

      case b: BinaryExpression =>
        val l = replaceAggFunctionCall(b.left, tableEnv)
        val r = replaceAggFunctionCall(b.right, tableEnv)
        b.makeCopy(Array(l, r))
      // Functions calls
      case c @ Call(name, args) =>
        val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
        function match {
          case a: AggFunctionCall => a
          case a: Aggregation => a
          case p: AbstractWindowProperty => p
          case _ =>
            val newArgs =
              args.map(
                (exp: Expression) =>
                  replaceAggFunctionCall(exp, tableEnv))
            c.makeCopy(Array(name, newArgs))
        }
      // Scala functions
      case sfc @ ScalarFunctionCall(clazz, args) =>
        val newArgs: Seq[Expression] =
          args.map(
            (exp: Expression) =>
              replaceAggFunctionCall(exp, tableEnv))
        sfc.makeCopy(Array(clazz, newArgs))

      // Array constructor
      case c @ ArrayConstructor(args) =>
        val newArgs =
          c.elements
            .map((exp: Expression) => replaceAggFunctionCall(exp, tableEnv))
        c.makeCopy(Array(newArgs))

      // Other expressions
      case e: Expression => e
    }
  }

  def select(fields: Expression*): Table = {
    val expandedFields = expandProjectList(fields, logicalPlan, tableEnv)
    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, tableEnv)
    if (propNames.nonEmpty) {
      throw new ValidationException("Window properties can only be used on windowed tables.")
    }

    if (aggNames.nonEmpty) {
      val projectsOnAgg = replaceAggregationsAndProperties(
        expandedFields, tableEnv, aggNames, propNames)
      val projectFields = extractFieldReferences(expandedFields)

      new Table(tableEnv,
        Project(projectsOnAgg,
          Aggregate(Nil, aggNames.map(a => Alias(a._1, a._2)).toSeq,
            Project(projectFields, logicalPlan).validate(tableEnv)
          ).validate(tableEnv)
        ).validate(tableEnv)
      )
    } else {
      new Table(tableEnv,
        Project(expandedFields.map(UnresolvedAlias), logicalPlan).validate(tableEnv))
    }
  }

  //......
}
  • Table提供了两个select方法,一个接收String参数,一个接收Expression参数
  • String参数的select内部先调用ExpressionParser.parseExpressionList解析String,之后再通过replaceAggFunctionCall替换UDAGG function,最后再调用Expression参数的select方法
  • Expression参数的select方法会使用Project重新创建Table,如果有aggregate的话,会创建Aggregate,然后再通过Project包装

Expression

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/Expression.scala

abstract class Expression extends TreeNode[Expression] {
  /**
    * Returns the [[TypeInformation]] for evaluating this expression.
    * It is sometimes not available until the expression is valid.
    */
  private[flink] def resultType: TypeInformation[_]

  /**
    * One pass validation of the expression tree in post order.
    */
  private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess

  private[flink] def childrenValid: Boolean = children.forall(_.valid)

  /**
    * Check input data types, inputs number or other properties specified by this expression.
    * Return `ValidationSuccess` if it pass the check,
    * or `ValidationFailure` with supplement message explaining the error.
    * Note: we should only call this method until `childrenValid == true`
    */
  private[flink] def validateInput(): ValidationResult = ValidationSuccess

  /**
    * Convert Expression to its counterpart in Calcite, i.e. RexNode
    */
  private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
    throw new UnsupportedOperationException(
      s"${this.getClass.getName} cannot be transformed to RexNode"
    )

  private[flink] def checkEquals(other: Expression): Boolean = {
    if (this.getClass != other.getClass) {
      false
    } else {
      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
        elements1.length == elements2.length && elements1.zip(elements2).forall {
          case (e1: Expression, e2: Expression) => e1.checkEquals(e2)
          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
          case (i1, i2) => i1 == i2
        }
      }
      val elements1 = this.productIterator.toSeq
      val elements2 = other.productIterator.toSeq
      checkEquality(elements1, elements2)
    }
  }
}

abstract class BinaryExpression extends Expression {
  private[flink] def left: Expression
  private[flink] def right: Expression
  private[flink] def children = Seq(left, right)
}

abstract class UnaryExpression extends Expression {
  private[flink] def child: Expression
  private[flink] def children = Seq(child)
}

abstract class LeafExpression extends Expression {
  private[flink] val children = Nil
}
  • Expression继承了TreeNode,它有三个抽象子类分别是BinaryExpression、UnaryExpression、LeafExpression

Project

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Project(
    projectList: Seq[NamedExpression],
    child: LogicalNode,
    explicitAlias: Boolean = false)
  extends UnaryNode {

  override def output: Seq[Attribute] = projectList.map(_.toAttribute)

  override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
    val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project]
    val newProjectList =
      afterResolve.projectList.zipWithIndex.map { case (e, i) =>
        e match {
          case u @ UnresolvedAlias(c) => c match {
            case ne: NamedExpression => ne
            case expr if !expr.valid => u
            case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
            case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
            case other => Alias(other, s"_c$i")
          }
          case _ =>
            throw new RuntimeException("This should never be called and probably points to a bug.")
        }
    }
    Project(newProjectList, child, explicitAlias)
  }

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]
    val names: mutable.Set[String] = mutable.Set()

    def checkName(name: String): Unit = {
      if (names.contains(name)) {
        failValidation(s"Duplicate field name $name.")
      } else {
        names.add(name)
      }
    }

    resolvedProject.projectList.foreach {
      case n: Alias =>
        // explicit name
        checkName(n.name)
      case r: ResolvedFieldReference =>
        // simple field forwarding
        checkName(r.name)
      case _ => // Do nothing
    }
    resolvedProject
  }

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)

    val exprs = if (explicitAlias) {
      projectList
    } else {
      // remove AS expressions, according to Calcite they should not be in a final RexNode
      projectList.map {
        case Alias(e: Expression, _, _) => e
        case e: Expression => e
      }
    }

    relBuilder.project(
      exprs.map(_.toRexNode(relBuilder)).asJava,
      projectList.map(_.name).asJava,
      true)
  }
}
  • Project继承了UnaryNode,它构造器接收Seq[NamedExpression]、LogicalNode、explicitAlias三个参数,其中explicitAlias可选,默认为false

Aggregate

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Aggregate(
    groupingExpressions: Seq[Expression],
    aggregateExpressions: Seq[NamedExpression],
    child: LogicalNode) extends UnaryNode {

  override def output: Seq[Attribute] = {
    (groupingExpressions ++ aggregateExpressions) map {
      case ne: NamedExpression => ne.toAttribute
      case e => Alias(e, e.toString).toAttribute
    }
  }

  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)
    relBuilder.aggregate(
      relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
      aggregateExpressions.map {
        case Alias(agg: Aggregation, name, _) => agg.toAggCall(name)(relBuilder)
        case _ => throw new RuntimeException("This should never happen.")
      }.asJava)
  }

  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
    val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
    val groupingExprs = resolvedAggregate.groupingExpressions
    val aggregateExprs = resolvedAggregate.aggregateExpressions
    aggregateExprs.foreach(validateAggregateExpression)
    groupingExprs.foreach(validateGroupingExpression)

    def validateAggregateExpression(expr: Expression): Unit = expr match {
      case distinctExpr: DistinctAgg =>
        distinctExpr.child match {
          case _: DistinctAgg => failValidation(
            "Chained distinct operators are not supported!")
          case aggExpr: Aggregation => validateAggregateExpression(aggExpr)
          case _ => failValidation(
            "Distinct operator can only be applied to aggregation expressions!")
        }
      // check aggregate function
      case aggExpr: Aggregation
        if aggExpr.getSqlAggFunction.requiresOver =>
        failValidation(s"OVER clause is necessary for window functions: [${aggExpr.getClass}].")
      // check no nested aggregation exists.
      case aggExpr: Aggregation =>
        aggExpr.children.foreach { child =>
          child.preOrderVisit {
            case agg: Aggregation =>
              failValidation(
                "It's not allowed to use an aggregate function as " +
                  "input of another aggregate function")
            case _ => // OK
          }
        }
      case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) =>
        failValidation(
          s"expression '$a' is invalid because it is neither" +
            " present in group by nor an aggregate function")
      case e if groupingExprs.exists(_.checkEquals(e)) => // OK
      case e => e.children.foreach(validateAggregateExpression)
    }

    def validateGroupingExpression(expr: Expression): Unit = {
      if (!expr.resultType.isKeyType) {
        failValidation(
          s"expression $expr cannot be used as a grouping expression " +
            "because it's not a valid key type which must be hashable and comparable")
      }
    }
    resolvedAggregate
  }
}
  • Aggregate继承了UnaryNode,它构造器接收Seq[Expression]、Seq[NamedExpression]、LogicalNode三个参数

LogicalNode

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/LogicalNode.scala

abstract class LogicalNode extends TreeNode[LogicalNode] {
  def output: Seq[Attribute]

  def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
    // resolve references and function calls
    val exprResolved = expressionPostOrderTransform {
      case u @ UnresolvedFieldReference(name) =>
        // try resolve a field
        resolveReference(tableEnv, name).getOrElse(u)
      case c @ Call(name, children) if c.childrenValid =>
        tableEnv.getFunctionCatalog.lookupFunction(name, children)
    }

    exprResolved.expressionPostOrderTransform {
      case ips: InputTypeSpec if ips.childrenValid =>
        var changed: Boolean = false
        val newChildren = ips.expectedTypes.zip(ips.children).map { case (tpe, child) =>
          val childType = child.resultType
          if (childType != tpe && TypeCoercion.canSafelyCast(childType, tpe)) {
            changed = true
            Cast(child, tpe)
          } else {
            child
          }
        }.toArray[AnyRef]
        if (changed) ips.makeCopy(newChildren) else ips
    }
  }

  final def toRelNode(relBuilder: RelBuilder): RelNode = construct(relBuilder).build()

  protected[logical] def construct(relBuilder: RelBuilder): RelBuilder

  def validate(tableEnv: TableEnvironment): LogicalNode = {
    val resolvedNode = resolveExpressions(tableEnv)
    resolvedNode.expressionPostOrderTransform {
      case a: Attribute if !a.valid =>
        val from = children.flatMap(_.output).map(_.name).mkString(", ")
        // give helpful error message for null literals
        if (a.name == "null") {
          failValidation(s"Cannot resolve field [${a.name}] given input [$from]. If you want to " +
            s"express a null literal, use 'Null(TYPE)' for typed null expressions. " +
            s"For example: Null(INT)")
        } else {
          failValidation(s"Cannot resolve field [${a.name}] given input [$from].")
        }

      case e: Expression if e.validateInput().isFailure =>
        failValidation(s"Expression $e failed on input check: " +
          s"${e.validateInput().asInstanceOf[ValidationFailure].message}")
    }
  }

  /**
    * Resolves the given strings to a [[NamedExpression]] using the input from all child
    * nodes of this LogicalPlan.
    */
  def resolveReference(tableEnv: TableEnvironment, name: String): Option[NamedExpression] = {
    // try to resolve a field
    val childrenOutput = children.flatMap(_.output)
    val fieldCandidates = childrenOutput.filter(_.name.equalsIgnoreCase(name))
    if (fieldCandidates.length > 1) {
      failValidation(s"Reference $name is ambiguous.")
    } else if (fieldCandidates.nonEmpty) {
      return Some(fieldCandidates.head.withName(name))
    }

    // try to resolve a table
    tableEnv.scanInternal(Array(name)) match {
      case Some(table) => Some(TableReference(name, table))
      case None => None
    }
  }

  /**
    * Runs [[postOrderTransform]] with `rule` on all expressions present in this logical node.
    *
    * @param rule the rule to be applied to every expression in this logical node.
    */
  def expressionPostOrderTransform(rule: PartialFunction[Expression, Expression]): LogicalNode = {
    var changed = false

    def expressionPostOrderTransform(e: Expression): Expression = {
      val newExpr = e.postOrderTransform(rule)
      if (newExpr.fastEquals(e)) {
        e
      } else {
        changed = true
        newExpr
      }
    }

    val newArgs = productIterator.map {
      case e: Expression => expressionPostOrderTransform(e)
      case Some(e: Expression) => Some(expressionPostOrderTransform(e))
      case seq: Traversable[_] => seq.map {
        case e: Expression => expressionPostOrderTransform(e)
        case other => other
      }
      case r: Resolvable[_] => r.resolveExpressions(e => expressionPostOrderTransform(e))
      case other: AnyRef => other
    }.toArray

    if (changed) makeCopy(newArgs) else this
  }

  protected def failValidation(msg: String): Nothing = {
    throw new ValidationException(msg)
  }
}

abstract class LeafNode extends LogicalNode {
  override def children: Seq[LogicalNode] = Nil
}

abstract class UnaryNode extends LogicalNode {
  def child: LogicalNode

  override def children: Seq[LogicalNode] = child :: Nil
}

abstract class BinaryNode extends LogicalNode {
  def left: LogicalNode
  def right: LogicalNode

  override def children: Seq[LogicalNode] = left :: right :: Nil
}
  • LogicalNode跟Expression一样,也继承了TreeNode;LogicalNode有三个抽象子类,分别是BinaryNode、UnaryNode、LeafNode

小结

  • Table提供了两个select方法,一个接收String参数,一个接收Expression参数;String参数的select内部先调用ExpressionParser.parseExpressionList解析String,之后再通过replaceAggFunctionCall替换UDAGG function,最后再调用Expression参数的select方法
  • Expression参数的select方法会使用Project重新创建Table,如果有aggregate的话,会创建Aggregate,然后在通过Project包装
  • Project及Aggregate都是case class,它们都继承了UnaryNode,UnaryNode是LogicalNode的子类;LogicalNode跟Expression一样,也继承了TreeNode;Expression有三个抽象子类分别是BinaryExpression、UnaryExpression、LeafExpression;LogicalNode也有三个抽象子类,分别是BinaryNode、UnaryNode、LeafNode

doc

  • Operations
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Table.select
  • Expression
  • Project
  • Aggregate
  • LogicalNode
  • 小结
  • doc
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档