前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Windows窗口简介和使用

Flink Windows窗口简介和使用

作者头像
木野归郎
发布2020-06-15 14:52:44
8380
发布2020-06-15 14:52:44
举报
文章被收录于专栏:share ai happiness

很多人不知道什么是Window?有哪些用途? 下面我们结合一个现实的例子来说明。

我们先提出一个问题:统计经过某红绿灯的汽车数量之和? 假设在一个红绿灯处,我们每隔15秒统计一次通过此红绿灯的汽车数量,如下图:

可以把汽车的经过看成一个流,无穷的流,不断有汽车经过此红绿灯,因此无法统计总共的汽车数量。但是,我们可以换一种思路,每隔15秒,我们都将与上一次的结果进行sum操作(滑动聚合),如下:

这个结果似乎还是无法回答我们的问题,根本原因在于流是无界的,我们不能限制流,但可以在有一个有界的范围内处理无界的流数据。

因此,我们需要换一个问题的提法:每分钟经过某红绿灯的汽车数量之和? 这个问题,就相当于一个定义了一个Window(窗口),window的界限是1分钟,且每分钟内的数据互不干扰,因此也可以称为翻滚(不重合)窗口,如下图:

第一分钟的数量为8,第二分钟是22,第三分钟是27。。。这样,1个小时内会有60个window。

再考虑一种情况,每30秒统计一次过去1分钟的汽车数量之和:

此时,window出现了重合。这样,1个小时内会有120个window。

扩展一下,我们可以在某个地区,收集每一个红绿灯处汽车经过的数量,然后每个红绿灯处都做一次基于1分钟的window统计,即并行处理:

通常来讲,Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。

Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。

1. Flink的Window类型

Flink基本分有3种window类型:CountWindow,TimeWindow和SessionWindow。 其中,CountWindow和TimeWindow还有滑动与滚动区分。

2.窗口函数有哪些

定义完窗口分配器后,需要指定在每个窗口上执行的计算,这就是窗口函数的职责。 在了解有哪些窗口函数之前,有必要了解Window的聚合分类: 全量聚合:简单点说是等属于窗口的数据到齐之后,才开始进行聚合计算;即全量聚合在未触发之前,会保存之前的状态,在最后窗口触发时,才会进行计算。(所以全量聚合的压力会很大。) 常见的窗口函数: apply(WindowFunction) --- 不过1.3之后被弃用 process(processWindowFunction)

增量聚合:窗口每进入一条数据,就进行一次计算。

代码语言:javascript
复制
reduce(reduceFunction);
fold;
aggregate(aggregateFunction);
sum(key);min(key);max(key)
sumBy(key);minBy(key);maxBy(key)

我们需要根据业务场景需要,决定使用是全量聚合还是增量聚合,并进一步选择使用哪一种聚合函数。

3.Window何时会被触发

Window何时被触发计算,是由触发器Trigger的onElement方法所决定。

该方法的参数: (1)element:到达的元素 (2)timestamp:元素达到的时间戳 (3)window:元素将被分配的窗口 (4)context:上下文

以时间类型设置为EventTime之后,触发器就是EventTimeTrigger,对应的onElement方法:

方法很简单:如果当前的watermark已经大于或等于窗口的最大时间戳(即窗口的endTime),那么就会触发窗口计算,并输出结果。 TriggerResult.FIRE:窗口计算并输出结果,尽管未清除窗口,但保留了所有元素。 否则的话,就是注册一个以窗口的最大时间戳为时间的定时器。

window.maxTimestamp()

这里的end是指窗口的结束时间,通常是在WindowAssigner中指定,WindowAssigner有:

以TumblingEventTimeWindows为例:

最后呢,给一个完整的例子:

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

object Window {
  def main(args: Array[String]) {
// set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source = env.socketTextStream("localhost",9000)

    val values = source.flatMap(value => value.split("\\s+")).map(value => (value,1))

    val keyValue = values.keyBy(0)
    // define the count window without purge
    val countWindowWithoutPurge = keyValue.window(GlobalWindows.create()).
      trigger(CountTrigger.of(2))
 
    val countWindowWithPurge = keyValue.window(GlobalWindows.create()).
      trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](2)))

    countWindowWithoutPurge.sum(1).print()
    countWindowWithPurge.sum(1).print()
    env.execute()
// execute program
    env.execute("Flink Scala API Skeleton")
  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 OnlyCoding 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Flink的Window类型
  • 2.窗口函数有哪些
  • 3.Window何时会被触发
    • window.maxTimestamp()
    相关产品与服务
    大数据
    全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档