专栏首页码匠的流水账聊聊flink Table的Group Windows
原创

聊聊flink Table的Group Windows

本文主要研究一下flink Table的Group Windows

实例

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w")  // group the table by window w
  .select("b.sum");  // aggregate
​
Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, b.sum");  // aggregate
​
Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  • window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive

Tumbling Windows实例

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));
​
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));
​
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));
  • Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义

Sliding Windows实例

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
​
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
​
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  • Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval

Session Windows实例

// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));
​
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));
  • Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time

Table.window

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {
​
  //......
  
  def window(window: Window): WindowedTable = {
    new WindowedTable(this, window)
  }
  
  //......
}
  • Table提供了window操作,接收Window参数,创建的是WindowedTable

WindowedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class WindowedTable(
    private[flink] val table: Table,
    private[flink] val window: Window) {
​
  def groupBy(fields: Expression*): WindowGroupedTable = {
    val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
    if (fields.size != fieldsWithoutWindow.size + 1) {
      throw new ValidationException("GroupBy must contain exactly one window alias.")
    }
​
    new WindowGroupedTable(table, fieldsWithoutWindow, window)
  }
​
  def groupBy(fields: String): WindowGroupedTable = {
    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
    groupBy(fieldsExpr: _*)
  }
​
}
  • WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable

WindowGroupedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

class WindowGroupedTable(
    private[flink] val table: Table,
    private[flink] val groupKeys: Seq[Expression],
    private[flink] val window: Window) {
​
  def select(fields: Expression*): Table = {
    val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)
​
    val projectsOnAgg = replaceAggregationsAndProperties(
      expandedFields, table.tableEnv, aggNames, propNames)
​
    val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)
​
    new Table(table.tableEnv,
      Project(
        projectsOnAgg,
        WindowAggregate(
          groupKeys,
          window.toLogicalWindow,
          propNames.map(a => Alias(a._1, a._2)).toSeq,
          aggNames.map(a => Alias(a._1, a._2)).toSeq,
          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
        ).validate(table.tableEnv),
        // required for proper resolution of the time attribute in multi-windows
        explicitAlias = true
      ).validate(table.tableEnv))
  }
​
  def select(fields: String): Table = {
    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    //get the correct expression for AggFunctionCall
    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
    select(withResolvedAggFunctionCall: _*)
  }
}
  • WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate

小结

  • window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive
  • Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义;Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval;Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time
  • Table提供了window操作,接收Window参数,创建的是WindowedTable;WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable;WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊flink Table的Group Windows

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    codecraft
  • 聊聊flink Table的Over Windows

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    codecraft
  • 聊聊dubbo-go的kubernetesRegistry

    dubbo-go-v1.4.2/registry/kubernetes/registry.go

    codecraft
  • 聊聊flink Table的Group Windows

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    codecraft
  • 设置WPF窗体全屏显示:

    //全屏代码: private void Window_Loaded(object sender, RoutedEventArgs e) { // 设...

    hbbliyong
  • JavaScript 学习(2)

    参考: http://www.w3cschool.cc/js/js-window.html

    lpe234
  • 写给大忙人看的 Flink Window原理

    Window 可以说是 Flink 中必不可少的 operator 之一,在很多场合都有很非凡的表现。今天呢,我们就一起来看一下 window 是如何实现的。

    shengjk1
  • JavaScript Window - 浏览器对象模型

    浏览器对象模型 (BOM) 使 JavaScript 有能力与浏览器“对话”。 浏览器对象模型 (BOM) 浏览器对象模型(Browser Object Mod...

    李海彬
  • 不走分成模式的搜狐却吸引了32万自媒体,2017年打算怎么干?

    正如我之前多篇文章所言,移动互联网之后,智能互联网、内容互联网和实体互联网已然来临,门户是内容互联网的重要玩家,然而最近一年,今日头条、一点资讯们声名鹊起,张一...

    罗超频道
  • requestAnimationFrame实现动画效果

    html动画一般会采用css3的形式去做,当然也比较建议用css去做动画。但是有时候一些动画只能使用js来完成,常用的js动画方案是使用计时器来完成。编写动画循...

    OECOM

扫码关注云+社区

领取腾讯云代金券