00:00
我们已经知道了窗口操作的基本步骤,那就是先点window定义一个窗口分配器,然后后边再执行一个操作,定义当前的窗口函数。好,那接下来呢,我们就首先来介绍一下窗口分配器是怎样使用的。那前面我们说了窗口分配器呢,其实就是要指定我们当前的窗口类型,那这里的分配它是要分配什么呢?那其实就是分配当前的数据,就是我们当前一个数据来了之后,那就要按照当前数据的时间戳去判断,去确定它到底应该发送到哪个窗口里面。这就是所谓的窗口分配器,所以我们说窗口分配器的类型呢,那就跟我们前面说的窗口的类型是一致的,比如说我们可以分成时间窗口和技术窗口,另外呢,也可以专门去定义滚动窗口,滑动窗口,会画窗口,还有全局窗口,那除了需要自定义的全局窗口之外,那其他的一些常用类型link都给我们提供了内置的分配器实现啊,所以这个其实就非常简单了啊,我们不需要去自定义这样的一个window sign了,接下来我们就在代码当中去看一看怎么样去调用。
01:13
那首先我们还是先来创建一个测试的object,我们把它叫做window test。首先没方法,先放在这里,那具体的处理流程,首先我们需要先创建当前的执行环境,STEM execution environment,我们把它get过来叫做env,上边还是改成下划线,然后接下来啊,不失正确性,我们把这个全局的并行度设成一,方便测试,接下来呢,读取数据,读取数据的话,哎,这个可以简单一点,因为我们之前有自定义的测试数据源嘛,我们直接ADD source,然后你有一个click source。把它作为我们当前的数据源,测试数据源,然后接下来我们知道,既然是要考虑窗口,那应该要有对应的时间定义啊,假如说后面我们是定义时间窗口的话,那需要提取时间戳,定义事件时间,发送水位线啊,那所以后边呢,我们应该有一个aign time stamp and water rocks里面我们知道啊,这里可以直接调用。
02:18
Wal mark strategy。掉点啊,那前面我们知道click source就是按照系统时间逐一生成的嘛,而且我们这个是非并行的s function,那是数据挨个去生成的,所以我们直接就是升序数据for mononess time stamp去生成就可以了,那另外这里边我们去指定一下当前的time stepmp s。你有一个srelizaable。啊,那类型是。里边必须要实现一个提取时间戳的方法,我们提取的是time step,好,那有了这个基本操作之后,这个得到的我们就叫做stream,然后接下来呢,基于stream就可以先做一个K啊,比方说我们还是啊,基于当前的user先做一个分组,然后呢就可以调用点window方法去指定当前的。
03:15
窗口类型了。那这里面我们看到它必须要传入的就是一个window a。这个window a本身是一个抽象类。这里边最重要的需要实现的一个抽象方法就是assign Windows啊,这就是表示我们数据来了之后啊,我们看一个element来了,来了之后到底应该把它分配到哪一个窗口当中去啊,那当然了,下面还有需很多需要实现的抽象方法,比方说get default trigger啊,我们当前的触发器什么时候触发计算啊,那get windowializer啊,我们当前的序列化器,另外还有time。对应的这些抽象方法都要去做实现,那如果说我们自定义的话,当然就是自己要实现这样一个window sign了,这个当然就比较麻烦了,好在弗link给我们提供了各种各样的实现,我们可以点开当前window a塞ER所在的包,我们看到啊,就在它所在的这同一个包里边,那就有各种各样的窗口的实现啊,比方说我们看到这里有global Windows,另外这里还有even time session Windows啊,那下面还有这个sding even time Windows和tumbling even time Windows,另外还有processing time Windows,所以我们看啊,这其实就是按照首先按照时间已经做了一个划分,然后不同类型有滑动、滚动,有绘画,各种不同类型的窗口都已经设置出来了,另外呢,还按照时间语义做了划分,我们可以针对even time时间、语义事件时间去进行划分,也可以按照处理时间processing time去进行窗口的划分。
04:52
啊,那所以接下来我们在代码当中呢,就不需要那么麻烦,只要使用这些对应的这些类就可以了。具体来说啊,比如说我们这里想要创建的是一个滚动的事件时间窗口的话,诶,那我们看到它本身就继承自window塞,那当然就实现了这个抽象类,那我们看到它里边的构造方法本身是一个protected啊,那所以接下来呢啊,我们看到下边它通过什么样的方法去创建一个自己的实例呢?对象实例呢?哎,那下面是有一个呃方法这个会返回一个。
05:27
Tumbly even time Windows,我们调这个静态的二方法就可以实现了,那这个方法呢,只有一个参数,那就是size,我们可以指定当前滚动窗口的大小,固定长度,当然了,这个of本身还有承载方法啊,除了这个size之外,接下来我们看到它还可以传两个参数,再传第二个参数,那是一个。Offset,哎,那就是当前的一个偏移量,这就是之前我们说的啊,假如我们要定义一个十秒钟一个窗口,哎,那正常情况下我们想肯定是从零开始嘛,那就是零到十秒第一个窗口,十到20秒第二个窗口。
06:07
那假如说我就非常的膈应啊,我就不想从零开始,我就想从第三秒开始,我定义的这个窗口是三到13秒,然后13秒到23秒,这可以不可以呢?当然也是可以的。所以这个时候我们就指定它的第二个参数offset,把它传进去是一个三秒钟啊,这样的话就可以了啊,那其实我们会想到这个在实际的应用场景里边,一般不会这么别扭,对吧?呃,我们肯定都是这个整点吧,比方说呃,按照这个一小时的时间窗口的话,那我们肯定就是八点到九点,九点到十点,十点到11点啊,那从零开始的话,它都是整点束,那一天的时间窗口的话,那肯定都是从前一天的零点到24点啊,到一天的结束,这个一般不会设置这样的偏移量,那什么时候有真正的用途呢?哎,其实我们看这个源码的注释里边已经给我们说明了啊,它主要就是用来考虑不同的时区啊,就比如说如我们看这个源码里面举的例子,就是中国的例子啊,我们知道在中国这个弗link确实是发展的非常的火热,那如果说我们在中国的话,中国的时区是东八区。
07:17
哎,那我们知道它是UTC时间的话,是UTC加八个小时,比标准的UTC时间,也就是隔离位置时间是要早八个小时的。所以这里边我们在中国啊,按照本地时间去设置这个窗口的时候,就会有一个问题啊,就这里边我们设置窗口底层它都是时间戳嘛,那时间戳本身表示什么含义呢?诶我们知道它表示的其实就是从1970年1月1号零点开始到现在为止的一个时间啊,一个秒数或者毫秒数,那标准的这个1970年1月1号零点是以谁为标准呢?当然就是格林位置时间UTC时间。所以所谓的时间戳从零开始,那那一个时刻其实是我们1970年1月1号的早上八点,我们是东八区早八个小时。
08:10
诶,那所以如果说按照这个标准我们定义啊,一天的窗口的话,那就每次都是标准时间的零点到24点,而对于我们中国本地时间来讲,每次设置的就都是早上的八点到第二天早上的八点。那这个就比较尴尬啊,我们也希望把它定义成按照整天数零点到24点,那怎么定义呢?哎,那我们看源码的注释里面已经给到我们方法,那就是直接可以定义一个of,然后一天的窗口长度,然后负八小时的偏移量,哎,那所以这样的话,我们定义这个窗口就变成了零点到24点,这个还是比较有用的啊,比较好玩啊,所以接下来呢,我们在这里可以把不同类型的窗口都做一个具体的实现,那比如说首先我们这里要定义的time boly even time Windows,哎,那这里边我们就直接调二方法里边我们看到要传的是一个time类型的size啊,其实刚才我们在这儿也已经看到它具体的实现了啊,直接引入这个time,然后直接点days.hours跟我们之前在设置water的时候,呃,在做这个乱序water设置的时候duration啊,有点像啊,只不过我们现在就是直接点days hours。
09:28
片就可以,那现在我们就是比方说设置一个一小时,那就是time点,哎,这里我们需要引入一下这个time,我们要引入的是flink streaming API windowing time.time对吧,跟window相关的一个type啊,那接下来我们就可以直接按照当前的单位去直接选择一个方法调用啊,那当然了,我们也可以直接点of of的话就后面自己去定义一个当前的时间单位啊,那一般我们不需要那么麻烦啊,那比方说我们一小时的滚动窗口,那就是HOURS1,这样的话就定义好。
10:03
啊啊,那当然了,如果说我们当前还想给一个对应的偏移量的话啊,那我们就可以直接在time点啊,比方说我们如果是一小时的窗口的话,那可能偏移量就是一个分钟了啊,十分钟啊,这样的话,我们统计的就是比方说啊,七点十分到08:10的一个窗口,八点十分到09:10的一个窗口,哎,这就是我们所说的。基于事件时间的。滚动窗口。这是最简单的一种定义啊啊,那那当然了,其实我们也看到,如果不是事件时间的话,同样我们也可以定义处理时间对应的窗口,哎,那就是tumbling processing type Windows,哎,那这个调用当然也是完全一样啊啊,我们这里边直接来一个of,然后接下来time点,呃,比方说我们这个就定义一天的窗口,然后定义一个。负八小时的偏移量,哎,那就是按照当前我们中国的东八区时区来定义一个本地的零点到24点的一个整天的时间窗口。
11:10
这里是基于。处理时间的。滚动窗口。当然了,不光是滚动窗口可以定义啊,其他类型的窗口都可以定义啊,那我们可以直接复制一下,比如说接下来我们直接就定义一个基于事件时间的滑动窗口,那就是。Liding event time Windows,我们直接把这个引入,哎,那后边我们可以看到啊,Liding event time windows.of它里边呢,诶是可以传两个参数,默认它就得至少传两个参数,为什么呢?因为除了当前窗口长度之外,还得有一个滑动步长,哎,所以当前啊,我们传入的第二个参数已经不是偏移量了,是当前的滑动距离,滑动步长slide啊,那当然了,如果说我们想要偏移量的话,可以再传第三个参数,那就是off。
12:03
所以这里边我们定义的这个其实就是一个基于事件时间的滑动窗口。当然了,同样我们还可以去定义基于处理时间的滑动窗口啊,这个我们就不重复去调了啊,肯定就是sliding processing time windows.on啊,就可以实现了啊,那接下来呢,我们就以事件时间为例,接下来我们还可以去定义一个基于事件时间的会话窗口,哎,那这个会话窗口呢,稍微有点不同,我们调用的是。Even time。Session Windows啊,就是even time放在前边了,然后接下来后面是会话窗口,然后他调的方法我们看到就跟之前的of不同,他要调的是with GAP或者with dynamic GAP啊,那当然了,Dynamic是动态的意思,也就是说with GAP,这里是指定了一个时间间隔的长度啊,我们就确定啊,按照这个时间间隔去划分绘画窗口,而如果是动态间隔的话。
13:02
那就是当前的这一个间隔还可以变啊啊,一般情况下,我们可能直接就是给一个指定的间隔就行了,里边当然又是一个time了,比方说我们十秒钟没来数据,我们就直接划分一个窗口,这就实现了一个会话窗口。那所以接下来这里是基于事件时间的。绘画窗口。同样的处理时间也可以定义会话窗口,那就是processing time session Windows啊,这个我们就不再去说了。除了当前我们已经定义好的这些时间窗口之外啊,那另外我们想到还有另外一大类啊,还有技术窗口啊,诶,所以接下来呢,我们还可以看一看技术窗口怎么样去定义哦,那定义技术窗口呢,我们说技术窗口的底层它本身是global Windows,哎,那所以呢,这个它并没有直接给我们去实现对应的这个window塞啊。之前我们在源码的这一部分里边啊,当前这个包下边我们也可以可以看到并没有count window对应的一些实现,诶,那所以当前这一个count window到底应该怎么用呢?其实count window更加简单,哎,我们可以在这儿啊,直接就不调点window方法了,我们直接调点count window,然后呢,诶,我们看里边可以传一个参数,那就是一个size,如果传一个十,那就是这就表示。
14:25
大小为十的。滚动技术窗口。滚动技术窗口,哎,那当然了,我们会讲到,那假如说我们看到后面还可以传第二个参数啊,如果再传一个二的话,这个叫什么呢?诶,这就是当前的滑动步长,那这就变成了一个滑动技术窗口,哎,那所以这个技术窗口的调用啊,会更加的简单,它就是传一个参数是滚动,传两个参数就是滑动,第二个参数是当前的滚动步长。那关于这个技术窗口呢,我们也可以直接点进去看一下啊,就在它里边,其实就是调用了底层的Java stream的点count window方法,然后呢,在这个方法里面,我们看创建window的时候,其实就是直接传入了global windows.create,诶,那这个global Windows,这就是一个window aigner啊,那所以你看我们这个传入了global window的window signer后边还得传当前的avior和trigger啊,这就是trigger,就是我们所说的触发器,要定义什么时候触发计算,那另外还有avior,其实就是所谓的移除器,就是定义哪些数据不要不要加入到计算的过程当中啊,这就是关于count window的底层逻辑,那其实关于这个。
15:44
Time window呢,我们看到这里边我们time window就是直接调点window方法啊,通用的这个window方法后面传入了flink底层给我们实现的window sign,那time window能不能像count window一样这么简单的去调用呢?那其实是可以的,前面我们也看到了,在k stream里边我们也可以直接调用time蒙的方法。
16:07
Time window方法,那其实得到的也是一个window stream,它的底层我们看这个time window其实就是要针对当前的时间语义去调用window方法,然后传入我们当前对应的那个window塞,而且底层调用还是这样的啊,那这种方法呢,我们看到它已经要被弃用了,为什么要被弃用呢?就是因为他得先判断一下当前的时间语义。而且在早些时候呢,默认的时间语义是processing time啊,就是处理时间,所以经常啊,我们在实际处理的过程当中,我们想要用的是事件时间语义啊,那经常就会忘记再单独的去设置一下这个时间语义,这个就很麻烦,所以现在的版本已经不再推荐大家使用,直接使用这个点太姆了,而是使用这种通用化的方法,哎,那所以我们就把这个打开,后边我们想做的操作可能往往就是基于事件时间,然后用使用通用的点温度方法去创建一个窗口。
17:09
这就是窗口分配器的使用。
我来说两句