00:00
定义了时间属性字段,那接下来我们把时间属性要用起来,它的主要用途当然就是进行窗口计算了啊,因为我们知道只有在底层的process API当中才能够去定义定时器这样的比较精确的控制,那在一般的上层应用当中,当然就是进行时间窗口的计算了。那对于窗口而言,它本身其实就是可以把无界的数据流切割成有限的数据集来进行处理,那在PI里边我们都已经过了,有各种各样不同类型的窗口,我们讲过了,有时间窗口和技术窗口,在时间窗口和技术窗口当中还可以去划分,比如说滚动窗口、滑动窗口,另外时间窗口当中还有一类比较特殊的绘画窗口。所有的这些类型我们都可以直接调用对应的API,然后去进行声明啊,当然了,时间窗口呢,我们还可以去区分,按照不同的时间语义去定义事件时间窗口和处理时间窗口,这些我们之前都已经非常熟悉了,那么对应的在table API和CQ里边,同样我们可以把所有的这些窗口功能全部实现。
01:24
那在flink01:12之前的版本当中呢?Table API和flink CQ其实是提供了一种API的调用方式,这种方式叫做group window,也就是所谓的分组窗口,好,那它就完全对应着之前我们在data streamam里边所熟悉的滚动窗口、滑动窗口,还有会话窗口,那如果说在CQ里边去调用的话,它其实就是一几个特定的函数,比如说。Bo,这个就是一个滚动窗口的定义函数,那滑动窗口呢,呃,是叫hop,当然了,这个有一些历史原因啊,它没有叫slide,呃,它叫hop就是跳动,在很多场景里边,对于滑动窗口的定义其实就是跳动跳跃这样一个定义,另外还有就是session,当然很明显这就是会话窗口了,里边主要就是传入当前的时间属性字段,然后还有窗口的大小,或者说滑动不长等等的啊,那比如说。
02:24
如果想要定义一个滚动时间窗口的话,非常简单,就是Bo,然后里边有两个参数,一个参数TSTS。就是之前我们在。表由那定好的事件时间与意下的时间属性的话,那当然对应的这个窗口就是一个事件时间的滚动窗口了,那后边呢,给的当然就是窗口的大小,同样它也是用一个in t,然后加上一个数值引号引起来的数值,再加上单位来组成的这样一个固定的语法表示一个时间段一小时的滚动窗口。
03:14
定义出来这个窗口之后,如果我们在CQ里边想要去做计算,那又应该怎么计算呢?我们知道在data threepi里边,类似于这样的定义,相当于只是给了一个窗口分配器的啊,那具体的操作,具体收集起来的数据要怎么样去做计算,其实是后边跟着的那个窗口函数来决定的,那在CQ里面的话,其实没有必要这么麻烦,我们之前也已经用过啊,类似于count这样的函数,它不就是可以直接做聚合吗?啊,那同样的,我们如果要是定义了窗口之后,直接在CQ里边去使用一些所谓的聚合函数。那么接下来我们要做的其实就是一个非常简单的窗口聚合操作,比如说这里举了一个最简单的例子,就是直接把当前URL每一个用户点击访问的URL的个数做了一个统计。而且我们这里是针对。
04:14
一小时的滚动窗口进行了统计啊,那其实这跟之前我们在上一节当中流处理中的表,在介绍整个转换过程当中的举的那个例子是一样的啊,所以我们这里就是select当前的user哪个用户,然后呢,我们还应该知道,因为既然是开了一小时的滚动窗口嘛,按窗口去统计,所以统计出来的信息里边也应该有当前。到底是哪个窗口,窗口的信息啊,那窗口的信息呢?特别的一个函数叫做tumble end,很明显这就是窗口的结束时间了,对应的当然也有tbo start,那就是窗口的起始时间,我们这里是以它的结束时间代表当前窗口啊,那同样后边传入的参数,两个参数跟窗口本身的定义是一模一样,只不过相当于就是这个函数名不同。
05:09
本来窗口的定义是Bo,那这里是tbo,然后后面还给了一个重命名and t啊,所以我们提出来的是这样一个字段,另外还有就是count UR c。那我们知道count既然要做聚合,显然后边必须要有一个group by啊,那group by我们这里边首先当前划分的K是user,这个是没有问题的,另外呢,我们针对每一个用户,还要在每一个一小时滚动窗口时间范围内去做聚合啊,所以后边这窗口到底在哪里出现呢?它是出现在后边。直接调用对应的窗口函数,然后把它作为我们进行分组的一个K来进行了指定。这是所谓的分组窗口,它这样一个命名的由来啊,为什么叫做group window呢?因为当前的定义就是跟在group by后边。
06:08
这是老版本里边的用法。那这种用法其实现在已经是处于depre的啊,就是将要被弃用的状态了啊,那当然现在如果说我们还想用的话,源码里面还是有对应的这个方法支持的,我们还是可以直接用老版本的方式去调用。但是他所能做的事情就比较有限的,功能是比较局限的,它只支持做这样的一些简单的窗口聚合,也就是说我们只能在这里使用一个liq支持的聚合函数来进行当前窗口内的统计啊,那所以在未来啊,或者说是当前01:13的版本里边。就有了更好的替代方案,我们就不应该用这种方式去定义窗口,那用什么呢?啊,那就是这里所提出的第二种方式,所谓的窗口表值函数window tvf。
07:07
诶,那从01:13版本开始啊,就使用这种方式来定义窗口了,什么叫做窗口表值函数呢。本质上来讲,它其实就是使用link CQ要去定义一个所谓的多态表函数,然后可以相当于什么叫做表函数,那就是相当于可以把我们当前的某一个表,我们基本基于比方说当前的table,或者我们在代码里定义的table来进行一个函数转换,可以把这张表扩展出来。增加一些新的列,然后呢,进行一些特别的计算啊,所以我们可以认为一个所谓的表函数啊,C function就是一个返回一个表的函数,我们在这里使用的时候,它一般就要传进来一个原始的数据表,然后经过。
08:02
窗口的相关操作之后,返回一个可能要增加一些新的字段,新的列这样的一张表啊,那关于表函数的内容呢,我们会在后边讲到函数这一节的时候再去展开进行介绍,那这里的话,我们只要知道怎么样去定义对应的窗口就可以了。对于当前的flink1.3版本里边给我们提供的所谓的窗口,TF有以下四类,这当然就代表的是四种不同的窗口了啊,在fliq里边并不提供所谓的技术窗口啊,技术窗口的话,那相当于我们可以用其他的一些方式来替代来进行。完全等价的替换,而这里边提供的当然主要就是时间窗口。时间窗口包含哪些呢?我们看到有滚动窗口、滑动窗口、绘画窗口,这都是dataampi里边我们比较熟悉的,另外还新增了一个叫做累积窗口umulate window。
09:07
啊,这是当前flinkq提供出来的不同的窗口功能,需要注意的是窗口目前还尚未完全支持,那相信会在未来的版本当中,很快的也会支持对应的这个应用了啊,当然通过这样的一个调整改变,我们就会知道在实际的生产项目当中使用呢,很显然绘画窗口的应用场景会比较少,而新增的累积窗口场景会比较多。那对于窗口T来讲,它其实本身是更加符合CQ标准的,它的性能得到优化,而且它的功能也更加的完善。首先它可以基于。窗口去做各种各样的传统的聚合,也就是说之前所说的这个group分组窗口能够做的这些操作,当前用窗口的T表值函数都可以去实现,这个是首先是全覆盖之前的功能全能干,那当然另外就还扩展出了很多新功能,比方说一些基于窗口的非常复杂的计算,例如之前我们在stream API里边讲的一个非常重要的案例就是top n。
10:22
啊,那我们可以基于窗口去统计当前的top n啊,那用窗口表值函数的话,可以非常方便的去实现,那另外还可以方便的支持所谓的窗口连接,之前我们说过。两条流,如果想要去做join连接操作的话,那有不同的方式啊,那一般话的话可以去UN connect,另外还支持直接的状操作,状操作窗口和间隔啊,那直观上来看的话,窗接其实就跟本身window相关的,而传统的之前版本的。
11:00
Group音的分组窗口做不到这一点,现在的表值函数就可以实现。那当然了,现在目前来讲啊,01:13是第一个引入窗口T的版本,所以它的功能其实还不足够完善,首先这个会话窗口就还没有完全支持,另外呢,还有一些比较特殊的功能也是没有办法去支持的。都在快速的开发完善的过程当中,所以我们应该能够想到未来的版本当中,这种使用窗口的方式应该就是唯一的方式啊,老版本就会被用了,所以我们关键是要来介绍掌握一下当前窗口T去定义窗口,使用窗口进行计算这种方法。那它的特点是在窗口TY,我们说它的返回可以返回一个新的表,可以把之前的原始表进行扩展,那它除了原始表的内容呢,还可以增加三个额外的列,诶,那就是所谓的window start window end和window啊,这个window start和这个比较简单,窗口的起始和结束嘛,那window是什么呢?什么叫做窗口时间呢?
12:15
其实window time就是window简易毫秒,所以它本质上就是我们之前讲到的data stream API里边。所谓的那个一个窗口当中的max。也就是说,在当前窗口当中能够包含的最大时间戳。而且同时也就是能够触发当前窗口计算的,呃,就是如果当前是事件时间的话,那就是当前的water mark,达到这个时刻就可以触发窗口计算。这就是。TF里边新增的几条列啊,那对于这个CQ里边的声明方式其实跟之前的分组窗口还是非常类似的啊,下面我们可以先来简单的说明一下怎么样用使用窗口T的方式去在Q里窗口啊,那这还是按照不同的类型来进行讲解,首先是滚动窗口,那tbo滚动窗口的话定义几乎就是完全一样的,我们看到这里边就是关键点在于传入一个时间属性字段,另外还有一个当前滚动窗口的长度。
13:28
时间长度,它的定义方式。窗口长度跟之前完全一样,还是一个interval关键字定义一小时的滚动窗口,而前面的这个时间属性字段呢,诶,它需要又需要用一个script关键字来进行一个声明啊,那所以这本质上是做了一个函数的调用,把TS作为参数进去。除此之外,前面还新增了一个参数,这个参数也非常简单,就是直接把之前我们。
14:00
想要扩展的那张数据表完整的传进来。因为之前老版本里面的分组窗口的话,我们是直接select,然后这个数据表,然后去。把当前窗口定义放在group里边的,那这样的话,我们相当于是把它当成一个特殊的分组方式来处理原始数据了,而现在呢,现在不是直接对原始数据分组,而是要对原始数据进行这张表要做一个扩展,所以当前的这一个所谓的表值函数啊,那原始的数据表也是它的一个参数。最大的区别就放在这儿。那同样滑动窗口呢,还是使用来进行定义,它的区别只是在于前面还是先把当前的这个表对象传进来,后边是一个TS,当前的时间属性字段啊,用script把它包装起来,另外后边要传入的是两个时间参数,两个时间间隔interval,而且这里需要注意的是。
15:05
第一个时间参数是当前的滑动步长,第二个时间参数就最后的这个时间参数才是当前的滑动窗口的大小。这个跟我们之前在stream API里边定义的时候,它的顺序是不一样的,我们要注意就是前面这个一般动比较小,后边这个是窗口程度比较大。这两个其实都比较简单,因为我们都比较熟悉了,那关键呢,是新增的这种窗口,就是所谓的累积窗口ulate,什么叫做累积窗口呢?呃,其实累积窗口之前我们在。讲解PI的过程当中也已经有所设计,或者说我们是用其他方式来进行实现的,就是比如说我们要统计PV啊,统计这个view啊,每天统计这个网站的访问量,页面的浏览量。
16:00
我们会想到正常情况下,我们要不就是。隔一个时间段,比方说我们统计每一天的PV,或者统计每一周啊,每每个月这样的一个PV,那正常,如果要是统计一天的整所有的网站的访问量的话,那就应该是等到每天的晚上24点的时候。这一天全部结束的时候,我们才会统计出来所有的结果,直接输出一次啊,那就是这就是我们所说的开一个一天的滚动窗口啊。但是有时候我们会想到你这个每一次都要等到当天全部结束才能看到当前。今天所有访问量的这个统计结果,这个就间隔的时间有点太长了,那每一次都都得隔一天之后才能看到一个新的结果,那我们想要快速的得到一个访问结果,能不能得到呢?他有一种方法能够想到我们可以用滑动窗口。
17:02
滑动窗口统计的频率就更高了,但是呢,滑动窗口它是不停的,比方说我们还是设置一个长度为一天的滑动窗口,然后每隔一小时滑动一次的话。那我们会发现它并不是统计我们当天零点开始,比方说诶到下一天的零点,也就是当天的24点结束,那如果他要滑动一个一小时的话,那应该统计的是。当天的一点到第二天的一点,这个24小时之内的所有的P位置访问量,哎,那这个其实不是我们想要的,我们想要的还是按照自然日零点到24点这个时间段的所有的数值,只不过我想看的呢,是从零点开始,诶到一小时了,这么长的一段时间内,我今天到底积累了多少数据?然后到两点钟的时候,又隔了一个小时了。
18:00
又积累了多少数据来,注意我现在要看的不是说一点到两点积累多少数据,是从零点开始一直积累到两点的时候,一共多少数据,所以我们会发现这种需求在很多实际应用场景里边,其实还是比较常见的啊,这种需求就是说我们是希望还是统计一段周期内总共。这个累计的值是多少,但是呢,我希望在这段时间的累积过程当中。更加频繁的快速的输出,当前到底积累到多少了?我们之前API里边的滚动窗口、滑动窗口,其实都没有办法实现这样的功能。如果我们记得的话,我们之前也是可以实现类似功能的,那就是我们可以去定义状态啊,就是用状态保存当前某一个窗口内已经统计了多少值,然后去不停的累加,然后呢,再定义一个定时器,每隔这样的一段时间诶,就触发一次,然后输出一下当前统计的这个结果值,那就类似于之前我们曾经实现的那个周期性的统计,输出当前PV结果的那样一个案例。我们可以使用process function来实现类似的需求,使用状态和定时。
19:22
啊,那当然了,这种实现是没有问题的,涉及到了比较底层的方法,API的调用,涉及到一些高级编程方式。但是这种方式有点太麻烦了,那我们想到这个需求其实非常的直观,非常的简单呀,啊,所以在flink CQ里边,现在01:13版本就提出了所谓的累积窗口的概念,那什么叫做累积窗口呢?本质上来讲就是刚才我们说的,我现在可以开一个累积窗口,这个累积窗口的触发,首先这个累积窗口的长度我们可以认为这是固定的啊,就当前我要统计的到底是多长时间范围内的数据呢?诶,我们定义了一天啊,那这就是一天范围内。
20:05
但是这个窗口结果的输出,它的触发时间点并不是等到最终结束,整个窗口结束的时候才触发一次。如果是这么做的话,那就成了滚动窗口或者滑动窗口了嘛,现在它的输出点是我们还可以。再定义一个所谓的。一步长啊,那所以这个步长看起来有点像之前我们所说的滑动步长,但是它是一个累积步长,也就是说我们每一次要统计当前已经收集了多少数据的这个间隔的周期啊,那比方说我们就说每隔一个小时,我就把当前窗口内的收集到的数据统计起来,输出一个结果,然后下隔下一个小时的时候,下一次又是统计当前窗口内所有数据,再统计一个结果,再做一次输出。啊,所以它其实就是在一定的统计周期内进行一个累积计算,频繁的输出,所以它里边有两个最核心的参数,一个叫做最大窗口长度。
21:11
也就是你统计的是一天的实际数据啊,那我们当前的这个最大窗口长度就是一天和累计不长,哎,那累计不长就是我们说的。到底是隔多长时间输出一次当前统计的结果啊,那我们每隔一小时输出一次,那当然就是累计补偿这一小时了。那对应的CQ当中的定义呢,也非常的简单,那就是umul这样一个。函数,然后里边因为是表示函数嘛,传入当前的数据表,然后时间属性字段TS后边要跟着两个时间参数,就像滑动窗口的定义一样,而且前边这个是。当前的累积步长,后边那个是当前的最大窗口长度啊,所以整体使用跟滑动窗口非常的类似。
22:03
我们现在就了解了在flink CQ里边所有窗口的定义方式,当然了,这仅仅是类似于窗口分配器的定义,那具体我们怎么样用它去做计算,怎么样去统计出对应的结果数据呢?哎,那就涉及到我们定义所谓的窗口函数了,怎么样完整调用呢?你哪怕不单独定义窗口函数,就像分组窗口,这里边有一个对应的这种使用的模板也可以啊,啊,那这一部分呢,我们会放在下一节,关于聚合。里边一起去进行介绍啊,那这一部分都属于窗口的聚合操作。
我来说两句