前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink Table的Set Operations

聊聊flink Table的Set Operations

原创
作者头像
code4it
发布2019-01-30 12:52:41
4750
发布2019-01-30 12:52:41
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink Table的Set Operations

实例

Union

代码语言:javascript
复制
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);
  • union方法类似sql的union

UnionAll

代码语言:javascript
复制
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);
  • unionAll方法类似sql的union all

Intersect

代码语言:javascript
复制
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);
  • intersect方法类似sql的intersect

IntersectAll

代码语言:javascript
复制
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);
  • intersectAll方法类似sql的intersect all

Minus

代码语言:javascript
复制
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);
  • minus方法类似sql的except

MinusAll

代码语言:javascript
复制
Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);
  • minusAll方法类似sql的except all

In

代码语言:javascript
复制
Table left = ds1.toTable(tableEnv, "a, b, c");
Table right = ds2.toTable(tableEnv, "a");
​
// using implicit registration
Table result = left.select("a, b, c").where("a.in(" + right + ")");
​
// using explicit registration
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");
  • in方法类似sql的in

Table

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

代码语言:javascript
复制
class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {
​
  //......
​
  def union(right: Table): Table = {
    // check that right table belongs to the same TableEnvironment
    if (right.tableEnv != this.tableEnv) {
      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
    }
    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
  }
​
  def unionAll(right: Table): Table = {
    // check that right table belongs to the same TableEnvironment
    if (right.tableEnv != this.tableEnv) {
      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
    }
    new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
  }
​
  def intersect(right: Table): Table = {
    // check that right table belongs to the same TableEnvironment
    if (right.tableEnv != this.tableEnv) {
      throw new ValidationException(
        "Only tables from the same TableEnvironment can be intersected.")
    }
    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
  }
​
  def intersectAll(right: Table): Table = {
    // check that right table belongs to the same TableEnvironment
    if (right.tableEnv != this.tableEnv) {
      throw new ValidationException(
        "Only tables from the same TableEnvironment can be intersected.")
    }
    new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
  }
​
  def minus(right: Table): Table = {
    // check that right table belongs to the same TableEnvironment
    if (right.tableEnv != this.tableEnv) {
      throw new ValidationException("Only tables from the same TableEnvironment can be " +
        "subtracted.")
    }
    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
      .validate(tableEnv))
  }
​
  def minusAll(right: Table): Table = {
    // check that right table belongs to the same TableEnvironment
    if (right.tableEnv != this.tableEnv) {
      throw new ValidationException("Only tables from the same TableEnvironment can be " +
        "subtracted.")
    }
    new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
      .validate(tableEnv))
  }
​
  //......
}
  • union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus

Union

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

代码语言:javascript
复制
case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
  override def output: Seq[Attribute] = left.output
​
  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    left.construct(relBuilder)
    right.construct(relBuilder)
    relBuilder.union(all)
  }
​
  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
      failValidation(s"Union on stream tables is currently not supported.")
    }
​
    val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
    if (left.output.length != right.output.length) {
      failValidation(s"Union two tables of different column sizes:" +
        s" ${left.output.size} and ${right.output.size}")
    }
    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
      l.resultType == r.resultType
    }
    if (!sameSchema) {
      failValidation(s"Union two tables of different schema:" +
        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
    }
    resolvedUnion
  }
}
  • Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作

Intersect

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

代码语言:javascript
复制
case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
  override def output: Seq[Attribute] = left.output
​
  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    left.construct(relBuilder)
    right.construct(relBuilder)
    relBuilder.intersect(all)
  }
​
  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
      failValidation(s"Intersect on stream tables is currently not supported.")
    }
​
    val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
    if (left.output.length != right.output.length) {
      failValidation(s"Intersect two tables of different column sizes:" +
        s" ${left.output.size} and ${right.output.size}")
    }
    // allow different column names between tables
    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
      l.resultType == r.resultType
    }
    if (!sameSchema) {
      failValidation(s"Intersect two tables of different schema:" +
        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
    }
    resolvedIntersect
  }
}
  • Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作

Minus

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala

代码语言:javascript
复制
case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
  override def output: Seq[Attribute] = left.output
​
  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    left.construct(relBuilder)
    right.construct(relBuilder)
    relBuilder.minus(all)
  }
​
  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
      failValidation(s"Minus on stream tables is currently not supported.")
    }
​
    val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
    if (left.output.length != right.output.length) {
      failValidation(s"Minus two table of different column sizes:" +
        s" ${left.output.size} and ${right.output.size}")
    }
    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
      l.resultType == r.resultType
    }
    if (!sameSchema) {
      failValidation(s"Minus two table of different schema:" +
        s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
        s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
    }
    resolvedMinus
  }
}
  • Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作

小结

  • Table对Set提供了union、unionAll、intersect、intersectAll、minus、minusAll、in(in在where子句中)操作
  • union及unionAll使用的是Union,intersect及intersectAll使用的是Intersect,minus及minusAll使用的是Minus
  • Union继承了BinaryNode,其construct方法通过relBuilder.union来构建union操作;Intersect继承了BinaryNode,其construct方法通过relBuilder.intersect来构建intersect操作;Minus继承了BinaryNode,其construct方法通过relBuilder.minus来构建minus操作

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
    • Union
      • UnionAll
        • Intersect
          • IntersectAll
            • Minus
              • MinusAll
                • In
                • Table
                  • Union
                    • Intersect
                      • Minus
                      • 小结
                      • doc
                      相关产品与服务
                      大数据
                      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档