00:00
接下来呢,我们开始给大家讲另外一部分很重要的API用法就是窗口操作了啊,那关于前面我们讲到的时间语义,大家知道,单纯的说时间语义,我们在这儿测这个看看最后得到得到这个字段,这没啥用,对吧?我们并不是想看这个东西啊,当然是想想让这个时间字段在具体的统计啊,聚合操作里边起到作用,那它在哪里能够起到作用呢?当然就是窗口了。所以接下来我们要给大家讲的主要就是table API和flink CQ里边的窗口操作,那首先呃,这里面给大家提到的这个,呃窗口呢有两种类型,一种叫做group window。有时候翻译就是叫分组窗口,那分组所谓的分组窗口就是什么呢?就是根据时间或者行的技术间隔来把他们分组,一组一组的去做聚合统计,然后对每一个组里边的所有数据执行一次我们定义出来的聚合函数。
01:07
大家想这是不是就是之前我们在data stream API里面定义好的那些滚动窗口,滑动窗口,呃,包括会话窗口对吧?这里边还分时间窗口和技术窗口对吧?这不就是我们之前讲的那个窗口类型吗?啊,所以这个其实并不陌生啊,然后另外还有一类就是所谓的over window,这就是大家可能之前更加熟悉的开窗函数啊,大家知道开窗函数的特点是。它不是分组之后,相当于一组聚合出一个结果,它是针对每一行,然后选取自己相邻周边的一些行做一个聚合,对吧?哎,它是这样的一个特点,所以最后大家知道开窗函数,呃,整个我们聚合的结果会变少吗?完全不会对吧?呃,就是我们当前聚合出来之后,每一行都会有对应的聚合结果,接下来我们就分别来讲,首先是group window,那么所谓的这个group window呢,它其实在使用的过程当中非常简单,直接就是一个table,然后直接点window定义就可以了。
02:11
所以它其实是用这样一个window子句来定义的,然后这个window里边呢,这里边要传的是一个所谓的group window,所以大家知道这就是自己要实现这样一个类了嘛,对吧,实现这样一个对象了啊,那这里边其实我们不用自己去写啊,就是table API里边给我们提供了就是一组具有特定语义的预定义好的这个group window的类,那这些我们如果定义出来之后,底层都会被转换为data stream API或者是data set API里面的窗口操作啊,所以本质跟我们那边的窗口是一回事,那这里需要大家注意的是,我在点window里边定义一个group window。后边还必须有一个操作叫做as,注意啊,还还还没整完呢啊呃,首先有些同学是想到之前我们不是说那个呃,Window这里边相当于只是开窗吗?后边是不是必须还要有一个聚合啊,就相当于是呃,我们说两步操作吗?
03:13
窗口分配器指定当前的数据属于哪个窗口,然后下面的窗口函数代表窗口要执行什么聚合操作,对吧?哎,那当时我们还讲到有预增量聚合函数和这个全窗口函数啊,这是当时stream API里边的窗口操作,那这里边是不是window之后也得做一个聚合啊,但是大家要注意这里的聚合table API里和这个CQ里边的聚合必须先做group by,对吧?必须先按照这个分组。然后呢,大家会发现之前我们那个data stream API里边也要分组啊,它是先KBY再开窗,而我们现在是先开窗再group by,这是为什么呢?啊,这是因为我们之前在KBY之后得到的KSTEM里边上下文,呃,就是后面我们在开窗做处理的时候,其实已经可以基于当前K的上下文去做这样的处理了,所以定义的所有的那个统计是不是都是针对当前K当前窗口的呀,而这里大家注意我开窗之后呢。
04:21
只是做了一个相当于一个窗口这个字段的定义而已。然后接下来是不是我要分组聚合的话,按照窗口聚合的话,是不是还得以窗口本身作为一个分组的标准啊,所以大家看到这个group by呢,我这里边必须是waa的话,可能是我们的一个ID对吧,某个字段本身有的字段,那W又是什么呢?这是前边我们自己给窗口定义的一个别名,所以大家一定要注意一下,窗口定义出来之后,后边这个是必不可少,一定要有一个as,指定一个别名。
05:02
原因就是因为是不是后边我分组必须得按它来分组啊,要不然你后面统计是不是就相当于没办法统计每个窗口里边的东西了啊,所以它是把窗口定义出来,作为一个单独的字段,然后呢,呃,又专门以这个字段做了一个分组操作,对吧?呃,然后这样的话,我们统计出来是不是就是当前K当前window里边的聚合结果啊,这就跟之前的串起来了啊。这就是呃,Window group Windows这样的一个用法,那具体来讲的话,这个group window到底怎么做呢?大家看它有几种预定义好的link给我们提供的实现一种就叫啊,当然就是滚动窗口了,Tabling Windows滚动窗口定义的时候呢,这个类就叫做T。大家看这个就更加简单粗暴啊,直接就叫tbo,然后定义,大家知道滚动窗口必须得有一个参数是窗口长度,那么窗口长度怎么定义呢?直接后面点over。
06:02
然后里边传一个字符串类型的表示时间的一个参数,这是十点minutes,就按照这个啊前面数字,然后点minutes表示十分钟啊,后面是这个时间单位,这就是开了一个十分钟的滚动窗口,然后后边呢,它还必须再来一个字段,呃,再来一个方法叫点啊指定一个字段,这个字段是对,这就是当前的时间的那个字段,对吧?如果我们指定的是事件时间滚动窗口的话,那就。Row time对吧,那如果要是处理时间滚动窗口的话,是不是就是on pro time呀?啊,那大家想一下,如果说我这里边要是一个技术窗口呢。啊,技术窗口的话,你也按照这个处理时间去这个定义这个当前时间就可以了,那我们前边这里边over开它长度的时候,你就不要按照时间去开,而是开一个大家看十点Rose,诶这样是不是就是十十个数据十行嘛,十个数据一个窗口,十个数据一个窗口,即滚动技术窗口对吧。
07:13
这就是我们关于这个滚动窗口的定义,最后不要忘记还有一个点,As,给一个别名啊。那同样的就是如果是滑动窗口的话,大家看一下是不是就是它叫slide呀?啊这里面这个叫slide同样还是over定义当前的一个窗口长度,那大家想滑动窗口还得有一个参数叫滑动步长嘛,滑动步长怎么定义呢?啊,它也简单,后面再点every every5点minutes,那这就是长度是十分钟,然后五分钟滑动一次的,然后on time,这是不是事件时间窗口啊啊,然后命重命名别名给一个W啊,所以这个其实整体来讲还是一样的啊,那后边这个就是滑动窗口,你如果是处理时间的话,哎,那也是一样啊,或者是这个这里边用这个Rose啊,用技数的话,那就是一个滑动技术窗口了。
08:08
最后还有一类session window大家还记得吧啊,SESSION10WINDOW,那这里边怎么定义呢?它最主要的特点就是有一个GAP对吧,最小的那个间隔时间,所以这里边同样它的这个类就叫做session,然后session.with GAP给一个这个最小间隔时间,后边的然后on啊处理时间还是这个,呃,事件时间还是定义就完了,后面再给一个as别名对吧?所以整体来讲的话,呃,要说简单,其实整体确实也还是比较简单的啊呃,那另外我们再来看一下,就是CQ里面怎么去做,CQ里边其实也非常简单,它的定义呢?呃,稍微略有不同,就是CQ里边定义的时候是用这个函数调用啊方法调用的这种方式,把它放在我们的那个group by子句里边就可以了。然后这里边定义滚动窗口的时候呢。
09:00
也是tumble,然后后边给两个参数。大家注意这不是一个类对象了,对吧?这就是一个函数啊tabo,然后里边两个参数,前面一个是时间字段,那就是基于row time或者基于pro time对吧?这就表示到底是事件时间窗口还是呃,这个还是处理时间窗口,那另外后面就是interval,这是不是就是窗口长度啊,大家还记得之前那个interval的写法吗?这个就是我们在前面讲到这里的时候,是不是有这样的一个写法啊,诶,所以给的就是这样一个参数啊,Interval几,然后second interval几minute对吧,直接这么写就完事了,所以这就是CQ里边的写法啊,那另外还有这个滑动窗口要给大家说一下滑动窗口有同学可能想那肯定是slide了,呃,还还真不是这个有一些这个历史原因哈,所以它是叫叫hop hope大家知道是有那个跳跃跳动的意思,所以它其实定义的是一个跳动窗口是吧?啊,它是一小步一小步往前跳啊,那所以它这里边定义的参数当然就得比滚动窗口多一个了,同样还是时间字段,后边是两个时间间隔,两个interval。
10:13
那大家这里需要注意的是第二个是。对,这是不长,然后最后一个才是窗口的长度,大家不要写反了啊,这个就是一开始操作的时候经常就写反了,然后发现,诶最后那个统计数据怎么不对呢?啊,窗口开的都不对,对吧?那主要就是第二个这是不长,最后一个才是窗口的大小,那另外还有就是session了,Session window的话直接SESSION2个参数非常简单,第一个时间特性,第二个是不是就是那个窗口的间隔啊啊,最小间隔对吧?这就是关于这个group window的一个定义啊,接下来我们还是在代码里边给大家做一个简单的测试吧。我们这个是把流转换成了表,而且对应的定义了这个时间特性啊,所以接下来接下来我们就是第五步操作,那就是。
11:06
窗口操作首先5.1我们要说的是group window。就是所谓的分组窗口,呃,首先我们说这个table API的用法啊,Table API的用法其实就是基于前面我们得到的这个data table接下来做什么转换啊,要开窗是不是就直接window啊?然后大家看到这个window里边必须要传的有两种方式,呃,大家看就是这里边要求传的啊。传参是不是两种啊,一种就是直接传一个group window,另外传一个over window啊,那现在我们是传这个group window了,Group window大家看到它本身是一个抽类,所以当前的这个抽象类是不是对应的会有一些具体实现啊,对吧?所以这里边看到这个我们这里边的这个session啊,Slide还有这个tumble,这都是它的具体实现,我们随便点一个看一下。
12:09
大家看这是这个tbo啊,哎,那大家会想到这个tbo,这怎么是一个final class class并没有返回一个这个group window呢。啊,这个大家不要着急啊,我们先一步一步写,接下来这个window里边对吧,接下来我们直接是tumble。Tempbo,首先大家会发现它里边能调的方法是不是只有一个啊,这个final class里面只有一个能调的方法,对吧?直接就是传一个over,然后over得到的大家看到是一个tumble with size,一个特殊的类,这样一个类型,好,所以这里边没什么好说的,你就只能是传一个当前的大小进来,比方说我当前啊,十秒钟一个对吧,十秒钟那是seconds,十点second,然后接下来得到的这个tle with size,大家看接下来能掉的呢,又只有一个方法。
13:07
只有一个on,对不对,然后这个on得到的叫tumble with size on time又是一个特殊的类,对吧?哎,所以这就没办法啊,一步一步就是这么定义死的,这里边比方说我要的这个。哦,大家看这里边我要的这个RT对吧?当前的这个字段啊,是以这个RT作为这个呃时间字段来传进来的,然后接下来大家看是不是当前的这个tumble with size on time,是不是就只有一个方法叫做as啊对吧?哎,所以就只能传一个进来,As这里面给一个别名,比方说我们当前是呃滚动窗口嘛,叫TW。这就是完整的一个定义,大家看这个写进来之后不忘错了直接,那为什么呢?因为它得到的是一个tumble with size on time with alias,它这个类呢,本身是一个group window,对吧,啊,就是这么来的对吧?啊,当前这是这是当前我们这个group window的一个真正意义上的一个具体实现,对不对。
14:10
啊,所以大家注意啊,就是那个汤Bo本身跟它没有继承关系,是这个类,这个final class,这是它的一个具体实现啊,所以这里边我们就定义出了一个十秒钟事件时间语义下的滚滚动窗口啊呃,然后接下来大家想,我把这个窗口定义好了之后,是不是得分组做聚合啊,关键是在这儿对吧?诶大家其实也已经看到了,接下来是不是能调的方法只有这一个,这已经大家注意啊,本来是一个table,调window之后已经得到的不是table了,得到的是一个group window的table对吧,叫做这个分组窗口化之后的这样一个table,哎,那么这个类型呢,它它是一个接口啊,它里面的方法就只有一个group。
15:00
所以接下来我们还是只能调group by里边给的字段,当前我们应该有两个是不是ID和TW啊,刚才我们定义好的这个窗口对吧,然后接下来这个group by之后,大家看它得到的这个呢。叫window group的table啊,这稍微有点绕啊,前面我们得到的是一个group window table,现在呢,Group by之后又得到了一个window group table,所以是基于窗口做了分组之后的这个table对吧?啊,那么它这个接口接下来大家看这是不是就是我们直接就是像之前那个直接那个group by之后的效果一样了,对吧,我们之前是不是group by之后可以直接那个aggregate flaggre,或者是直接select做聚合啊。现在是不是能调的方法也是一样啊,Aggregate和flat aggregate的话,这个要传的必须得是一个自定义好的啊,对应的这样的一个聚合操作,那我们更简单的调线成的函数是不是select就够了啊,Select之后得到的就是一个table对吧,直接就可以做这个操作了啊。
16:08
啊,所以这里面的这个类型转换呢,其实看起来复杂,说复杂也复杂,但事实上大家会发现,这里面一步一步是不是都是写死的呀。就必须这么用啊,你别的方法都都没法用嘛,所以大家只要记住怎么用就可以了,那这里边select的时候我可以取一下啊,比方说ID对吧,然后我可以id.count有几个数拿出来对吧?啊,那另外我可以拿这个当前那个字段是叫temp还是叫temperature来着,叫temp对吧,我直接拿这个temp.AABG平均值对不对?哎,那另外另外我这里还可以取。大家回忆一下,之前我们那个窗口里边是不是有各种信息啊,我是不是还可以得到当前窗口到底是比方说什么时候开的,什么时候关的,这是不是从窗口信息可以拿出来,那你现在既然有这个窗口字段嘛,我当然可以取,对不对?W点,比方说and,这拿到的就是当前窗口的结束时间。
17:11
大家想是不是我就知道什么时候关的这个窗口了,这个看到看到这个数据就相当于的更更,呃明确一点对吧,我就知道里边到底是什么东西了啊哎,所以我把这个定义成一个result table这样的一个窗口聚合操作,对吧?啊啊,那与之对应的我们写一个这个CQ的实现吧,大家看一下这个CQ怎么写呢?啊,首先还是啊前面这里边把这个表转换出来之后,我们是不是还得在那个环境里面去做一个注册啊,Table env对吧?呃,这个是register。呃呃,不不是,我们是create temporary对吧,直接把这个表当前这个table啊,直接在环境里边注册出来,比方说我当前这个叫sensor。
18:04
然后里边,呃,比方说这个是把data table直接放进来,那接下来CQ里边是不是就直接可以查这个三四这张表了,已经有了嘛啊,所以接下来我们直接就是table env.CQ query写一行CQ进来,现在我们select,我是不是照着上面这个写就行了。是不是s select ID啊,对吧,最上面的最后一行就是我这里边最先写的嘛,呃,S select的ID,然后呃,接下来是这个count ID对吧,这个as CT啊,然后接下来是avg camp,我这个s avg temp,然后另外还有这个,诶这里大家看到还有一个这个窗口的结束时间,这个怎么给呢。那大家想是不是肯定得有对应的哎,处理它的那个函数啊,所以这个我们先放在这儿啊,先不着急,呃,然后接下来呢,我可以继续往后写啊,继续往后写,我这里边可以写一个from sens对吧?啊然后另外怎么样。
19:15
哎,大家想是不是我接下来得去做一个,得去做一个group啊,对吧,Group ID,然后另外窗口,窗口怎么定义。哎,就是前面我们在这个PPT里边,大家看到的是不是直接tumble,然后给两个这个参数就可以了,直接这么定义出来就完事了啊,所以是tumble里边的字段,当前的这个时间字段,那应该是RT对吧?我们之前定义的叫RT啊,然后另外还有一个十秒钟INTERVAL10SECOND,大家注意这个second是单数啊,这个不带S的,这里必须只是一个单位而已啊,十秒钟,然后有了这个之后,那关于我们当前的这个窗口,到底又应该怎么定义它的那个结束时间呢?这其实特别简单。
20:08
我直接把这个的定义啊,窗口的定义直接放在这儿,然后这里边只不过这个函数不叫temple了,这是创建窗口的那个函数,对吧,我现在要的是这个temple_and就可以把它拿出来。啊,所以大家就想到了,如果说我要它的窗口起始时间怎么办。是不是temple_start啊,对啊,所以你像如果是滑动窗口的那个起始时间呢,是不是ho_start ho hop啊,下划线start或者ho_and啊,这就可以把它取出来了,所以这个过程其实是一样的啊,直接做这样的一个,这这里大家注意一下,你要就是这个写完了之后至少要空一格对吧,然后要不然的话就是from前面空一格啊,要不然接起来的话,这个就会报错了。好,这就是我们对于这个CQ的一个书写的过程,那接下来我们还是把这个做一个测试吧。
21:00
大家看现在,呃,之前我们是这个data data table做了一个测试是吧?呃,那现在我把这个助调来给大家看一下result table,大家想我现在直接把这个做一个打印能行吗?我现在是直接to a stream,大家觉得可以吗?大家觉得这里边是做了聚合的对不对,我这里边count avg,呃,这这明显是做了聚合的嘛,那是不是大家觉得这里面应该是to retra stream啊,这个感觉才合理一点,对吧?好好,我们来看一下结果吧,啊上面我们这个没有没有把这个输出成表啊,得到一个result CQ table,好,这里边我们把这个result CQ table也做一个输出,Result c table。这里边写一个CQ,好,我直接运行试试看看它前面这个end,后面这个retract肯定没毛病,对吧,那前面这个end,我看看会不会成功。
22:06
大家看到我们现在已经成功的输出了结果啊,这里面我们看一下结果吧,首先啊,我我就先挑着这个result来看啊,Result大家看这里边输出的是不是就是当前的ID,然后是诶,这是count数量对吧?Count数量,然后是温度的平均值,后边还有一个窗口结束时间对不对?所以大家看到这个result啊,20秒这里边一个统计当前的这个341是有一个这个大家当然知道了,我们这341是不是第一个,如果是十秒一个窗口的话,大家觉得第一个窗口应该是多少到多少啊。我们现在是十秒一个啊,不是15秒了,十秒一个的话整十对吧,按照我们之前的那个推推断,那应该是190~200对不对,那当然只有他一个了啊,所以下面大家如果不信的话,也可以把这个时间说做一个转换对吧,看是不是这个啊呃,然后我们看看看是不是那个200对吧,结束时间应该200,然后下边大家看六和七啊346,然后下面这个,诶下面。
23:12
没看到37呢,Result。哦,这里边我们好像没有全局的给那个并行度的设置对吧,所以这里面大家看这个顺序是乱的啊,不过这也没关系,不影响我们结果正确嘛,只是看着稍微费劲一点,你看这个3461个对吧。然后15.4,它是30秒结束的窗口,同样SENS4,七。我们找这边啊,三七是不是也是一个,它是30秒结束的窗口啊,另外三四十一个是不是也是30秒结束的窗口,所以大家看是不是就是下一个窗口了。那应该是,是不是200~210啊,那大家想一下,在这个30秒结束的窗口里边,一应该有几个。200到二幺二二幺零三四十一应该有几个?
24:03
这是不是应该有两个207209嘛,所以接下来我们找一下30秒结束的窗口,三一,这个是20秒的啊,然后我们找这个三四十一。这个是40秒的,在这儿呢啊341是不是两个啊啊,稍微有点不容易找到两个,然后平均值是34.55,然后这里边是不是30秒这个窗口34.55,大家看一下是不是就是36.3和32.8求和,然后除以二得到的平均数啊啊,所以这就是我们得到的这个结果啊。然后CQ和result这里面两条流输出的结果都是一样的。那自然我们也就想到了,为什么可以不用to stream也可以呢?你看一下这里的这个CQ就知道了。C我们是to对吧,你看这里边是不是只有处啊,所有的是不是只有处,哎,那家想问一下这为啥这里边我们是只有处呢。
25:03
对,因为大家注意啊,我确实窗口内部每来一个数据之后都得聚合一次,改变这个窗口的里边的聚合状态,但大家想我是不是改变的只是窗口内部的聚合状态呀,我改变他最后查询的结果的那个动态表了吗?它的查询结果动态表是不是只有到窗口结束的那一时候才输出一个啊,哎,所以大家会发现最后没改呀,对吧,最后相当于只统计了一次嘛,所以这里边你看并不是说有聚合就一定要用retract对吧?哎,这里边只要他不改,关键在于改不改,如果他不改的话,我这里边直接判输出也是没问题的。好,这就是关于这个group window这一部分。
我来说两句