专栏首页码匠的流水账聊聊flink Table的OrderBy及Limit
原创

聊聊flink Table的OrderBy及Limit

本文主要研究一下flink Table的OrderBy及Limit

实例

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");
​
Table in = tableEnv.fromDataSet(ds, "a, b, c");
​
// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); 
​
// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);
​
// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);
  • orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {
​
  //......
​
  def orderBy(fields: String): Table = {
    val parsedFields = ExpressionParser.parseExpressionList(fields)
    orderBy(parsedFields: _*)
  }
​
  def orderBy(fields: Expression*): Table = {
    val order: Seq[Ordering] = fields.map {
      case o: Ordering => o
      case e => Asc(e)
    }
    new Table(tableEnv, Sort(order, logicalPlan).validate(tableEnv))
  }
​
  def offset(offset: Int): Table = {
    new Table(tableEnv, Limit(offset, -1, logicalPlan).validate(tableEnv))
  }
​
  def fetch(fetch: Int): Table = {
    if (fetch < 0) {
      throw new ValidationException("FETCH count must be equal or larger than 0.")
    }
    this.logicalPlan match {
      case Limit(o, -1, c) =>
        // replace LIMIT without FETCH by LIMIT with FETCH
        new Table(tableEnv, Limit(o, fetch, c).validate(tableEnv))
      case Limit(_, _, _) =>
        throw new ValidationException("FETCH is already defined.")
      case _ =>
        new Table(tableEnv, Limit(0, fetch, logicalPlan).validate(tableEnv))
    }
  }
​
  //......
}
  • Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)

Sort

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
​
  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)
    relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
  }
​
  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
      failValidation(s"Sort on stream tables is currently not supported.")
    }
    super.validate(tableEnv)
  }
}
  • Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件

Ordering

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/expressions/ordering.scala

abstract class Ordering extends UnaryExpression {
  override private[flink] def validateInput(): ValidationResult = {
    if (!child.isInstanceOf[NamedExpression]) {
      ValidationFailure(s"Sort should only based on field reference")
    } else {
      ValidationSuccess
    }
  }
}
​
case class Asc(child: Expression) extends Ordering {
  override def toString: String = s"($child).asc"
​
  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    child.toRexNode
  }
​
  override private[flink] def resultType: TypeInformation[_] = child.resultType
}
​
case class Desc(child: Expression) extends Ordering {
  override def toString: String = s"($child).desc"
​
  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    relBuilder.desc(child.toRexNode)
  }
​
  override private[flink] def resultType: TypeInformation[_] = child.resultType
}
  • Ordering是一个抽象类,它有Asc及Desc两个子类

Limit

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
  override def output: Seq[Attribute] = child.output
​
  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    child.construct(relBuilder)
    relBuilder.limit(offset, fetch)
  }
​
  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
      failValidation(s"Limit on stream tables is currently not supported.")
    }
    if (!child.isInstanceOf[Sort]) {
      failValidation(s"Limit operator must be preceded by an OrderBy operator.")
    }
    if (offset < 0) {
      failValidation(s"Offset should be greater than or equal to zero.")
    }
    super.validate(tableEnv)
  }
}
  • Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch

小结

  • Table的orderBy方法类似sql的order by;limit则由offset及fetch两个方法构成,类似sql的offset及fetch
  • Table的orderBy方法,支持String或Expression类型的参数,其中String类型最终是转为Expression类型;orderBy方法最后使用Sort重新创建了Table;offset及fetch方法,使用Limit重新创建了Table(offset方法创建的Limit其fetch为-1;fetch方法如果之前没有指定offset则创建的Limit的offset为0)
  • Sort继承了UnaryNode,它的构造器接收Set类型的Ordering,其construct方法使用relBuilder.sort来构建sort条件;Ordering是一个抽象类,它有Asc及Desc两个子类;Limit继承了UnaryNode,它的构造器接收offset及fetch参数,它的construct方法通过relBuilder.limit来设置offset及fetch

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊flink Table的OrderBy及Limit

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    codecraft
  • 聊聊flink JobManager的heap大小设置

    flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.j...

    codecraft
  • 聊聊flink TableEnvironment的scan操作

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment....

    codecraft
  • 使用对抗生成网络(GAN)生成手写字

    这是通过GAN迭代训练30W次,耗时3小时生成的手写字图片效果,大部分的还是能看出来是数字的。

    Awesome_Tang
  • 资源 | 机器学习新框架Propel:使用JavaScript做可微分编程

    机器之心
  • Greenplum性能优化之路 --(二)存储格式

    Greenplum(以下简称GP)有2种存储格式,Heap表和AO表(AORO表,AOCO表)。

    lambgong
  • 金融服务领域首张区块链电子发票今早在招商银行开出

    继全国首张区块链电子发票8月10日于深圳国贸旋转餐厅开出之后,深圳市税务局区块链电子发票平台落地场景扩展至金融服务场景,招商银行成为首家接入系统的金融机构。

    腾讯TrustSQL
  • 一次线上数据库添加字段造成磁盘不够的问题

    公司使用的是MySQL数据库,随着业务和用户的增加有张表的数据达到了150000000(1亿5千万)条左右,其中好几个功能都会对这张表进行增删改操作。在并发量比...

    jasonlu
  • 微服务架构设计 第一步: 从特性到业务场景

    2016.9.8, 深圳, Ken Fang 微服务到底应该如何的识别? 微服务的粒度为何? 微服务该如何的分析与设计? 这些问题的答案, 取决于: 为何需要微...

    Ken Fang 方俊贤
  • 简单jQuery九宫格

    deepcc

扫码关注云+社区

领取腾讯云代金券