00:00
我们现在已经知道table API里边的group window到底怎么用了,那下面就在代码里边实际做一个操作,刚才我们这个代码里边,诶,前面都已经定义好这个事件时间,而且呃,转换从流,这个转换成表,我们把这个时间字段也已经定义好了,哎,所以接下来这个就非常简单,我们不用做别的操作了啊呃,这里边直接做转换吧。呃,这里边给大家先注释一下,我们这个是第一个情况,对吧,我们先做一个,呃,比较简单的这个,呃,Group window的一个操作啊,我们做这个。Group window。聚合操作对吧。好,我们来看看这东西该怎么写啊,我还是定义一个result。当前的这个当然是个table类型了啊,这个其实不写,大家也都知道,这个都没有具体的类型了嘛,Sensor table基于之前输入的这一个数据,这个呃表我们接下来是要开窗口对吧,做窗口操作,这个操作其实是在这个table API里边给我们定义好的啊,大家看到这个window这个操作呢,它有两种调用方式,一种里边传一个group window对吧?呃,就是这里边是必须要传一个定义好的group window,然后得到的是什么呢?得到的是group window的table,得到一个这样的一个类型。
01:30
啊,那同样就还可以给传一个over window对吧?啊,大家看这个还可以是多个啊,就是over window得到的是一个over window的table啊这个后面我们再讲这个over window,现在我们就看这个分组啊group window,这里边要传的这个group window是个什么呢?啊,它其实就是我们说的,大家看这里边的这个。这个呃,就是这个定义啊,这是一个抽象类啊,它其实就是说什么呢?就是你可以基于这个行,呃,就是我们这里边是把这个每一行要去分组嘛,它基于什么呢?基于这个time,基于时间字段,或者是基于诶当前的这个行数的间隔啊,然后呢,最后得到之后啊,就是非常接近于just like group by,所以后边我们得到这个操作,相当于就是得基于这个group by之后分组的窗口去做聚合的。
02:26
啊,大家就是这样去理解它这个,呃,Group Windows就可以了,对吧?啊,这是这样的一个抽象类,那得到的这个数据类型呢,又叫做group window table啊,这个像我们那个。Data stream,大家还记得data stream是先做那个K对吧,得到那个k stream,然后k stream呢,再去点window开窗,得到一个window stream啊,当时我们之前给大家讲过是这个过程啊,Data stream。这又是复习回顾了啊K。我就简写了啊,得到一个kid stream。
03:01
然后PI,当然它可以直接聚合回去了啊,我们这里边做的操作是它又做了一个开窗操作。得到了一个window stream,对吧,那么window stream再做这个聚合,可以再得到我们原始的这个data stream,所以是这样的一个流程,而现在在这个table API里边呢,呃,它的定义稍微有一点不一样,它定义的是我们的这个基础是table,对吧,Table经过这个。Window操作直接得到一个group window,就是比方说我们这这个开的是分组窗口啊window对吧,直接用这个window操作得到的是一个group window table group window table对吧。啊,然后呢,后边我们要做什么操作呢?基于这个group的window table,还再做一次这个group by操作啊,就是之前我们的那个KY,应该是在他之前就先做KBY了,对吧,现在是反过来了,先得到一个group window的table,然后呢再去group by,再指定到底是谁是分组对吧。
04:09
啊,所以这里面得到这个数据类型是是什么呢?这个稍微又有点有点奇怪啊,大家可以看到这个啊的group window table里边就一个方法,就叫做group by。Group by里边得到的,得到的这个数据类型呢,叫做window group table,就是绕回来了是吧,刚才那个叫group window table是先开了一个窗,然后现在呢是window group table是分组了,对吧?这所以说既然是你那个window group table嘛,当前的这个group分组,当然就得大家看这个举例啊。当然就得基于当前的某个这个window对吧?啊,所以你前面可以是定义一个group window,然后as一个W,后面group by必须要加进来这个,然后你还可以加进来别的K对吧,真正我们那个ID根据它做分组。
05:02
啊,所以这里边是得到一个这么个东西啊,这么个东西,这还没完,因为我们说这相当于只是做了我们那个KBY嘛,相当于把那个K和WINDOW2个操作在data stream里边的这两步操作换过来做做这两步操作了,然后最后我们还有那个窗口函数呢,那窗口函数怎么办呢?哎,调到这个接口里边来看它的方法,哎,这里边方法就是select。啊,当然了,还有这个聚合对吧?Aggregate Fla aggregate这两个的话就是比较一般化的,你可以去自定义的聚合方法了,那我们这里边就是select对吧,Select没有体现出聚合,那这个聚合放在哪里体现,那就是CQ里边你要给一个聚合的函数进去对吧?啊,什么sum对吧,或者can't,做一个这个聚合就可以把它得到了。呃,所以这个整个的这个数据类型的转换过程啊,再给大家总结一下这是什么呢。基于table对吧,基于table首先现在不是做K,不是先做分组,它是先开窗,所以先做了一个这个window操作得到的是一个啊叫做。
06:07
Group window table group window table对吧?呃,叫做这个啊。然后呢,基于这个group window table,然后再去做分组,做这个group by。那么这个group by之后得到的这个类型叫做window group table,稍微有点绕window group table。然后得到的这个就相当于我们已经针对定义的那个,呃,Window啊,也做了一个分组了,大家看为什么要这么定义,就是因为我必须先把window定义出来,然后我分组的时候要用到那个window,对吧,就是因为这个问题,所以说我必须反过来,就是先开窗,再去分组,再group by。那得到这个之后呢,最后我用一个聚合对吧,当然这个我调的应该是AG那个select方法,Select方法里边有那个聚合的函数,这样的话就又可以得到了一个table,就绕了一圈又绕回来。
07:04
其实还是之前的三部,那只不过就是为什么大家看又得到了table对吧,不管是哪个方法,最后其实得到的都是,呃,当然有可能是一个aggregated table对吧?你如果是aggregated table的话,那可能后面还得再做转换啊,就比方说这个你后边再去,它里边还有select方法,再select出来就又是table了。啊,所以这边大家就会看到它的这个转换过程可能稍微有一点麻烦,但是整体的过程呢,跟之前还是一样的,为什么顺序调了跟之前不一样了呢?就是因为当前的这种转换是这个做分组的时候一定要把window信息传进去,对吧。之前我们在data stream API里边那个K分组就是分组,它是做那个呃,数据重分区的啊,做数据传输用的,而后边你那个窗口window呢,Window就就直接开window就好了,对吧,因为已经是就是分组数据已经分好的这个状态,Window呢只是一个分的操作而已而已啊,它不存在后边我们CQ里边要去统一做分组这个这个操作,哎,所以这个过程就是稍微有点。
08:12
有点不一样啊,但要把这一个弯要转回来。啊,那我们来看一看现在这个操作,诶现在这个已经点了啊,所以首先就直接是一个点window啊,那这里边window我们就可以开窗,比方说我开一个滚动窗口吧,呃,Tumble对吧,大家看有这个类对吧?Flink table API下面有这个类tumble啊那大家知道这直接就可以点window去调这个,呃,定义它里边的一些参数了啊,点window啊,那这里边我们不用这种方式,推荐大家还是用这种空格的方式,这种调用方式更直直白啊,整个的这个语义更明确,那就是tble over,哎,Over那个里边要传的是一个时间,大家刚才看到这个点over里边要传的是一个size的一个表达式或者一个字符串啊,所以你这里可以给那个字符串的十秒啊,那也可以给什么呢?我们比方说要记十秒的滚动窗口啊。
09:06
也可以直接给一个。表达式,大家看这表达式就是一个十,然后点我给一个second对吧,直接这样给,这就是一个表达式。所以这就相当于定义了一个十秒的滚动窗口。啊,然后后面还得必须得有on对吧?啊,必须得有这个time的这个字段啊,时间语义的字段,那比方说我当前就on ts,前面不是定义好这个叫TS吗?重命名了,然后后边还必须得有一个S,这个W对吧,或者我认为这个叫叫呃tumble window嘛,所以我叫TW啊这个给他一个名称就可以了。后边必须要做一个分组,分组的时候呢,我基于ID对吧,当前的这个哪个三,另外必须要把这个TW也传进去,窗口也传进去。最后就可以做一个select操作,Select这里边比方说我取当前的这个ID对吧,然后呢,我就是看这个十秒钟之内,呃,就像我们之前说的十秒钟之内这个窗口里边有多少个温度值对吧?计数记做一个技术,做一个count就可以了,哎,那这里边来一个count ID对吧?id.count调这个方式,这是一个表达式的写法啊,不是我们那个CQL里边函数的写法,直接写出来,这就可以统计了。
10:26
大家看这个类型也匹配上了,对吧,绕了一圈之后又回到table了,回来了。啊,那呃,当然就是说,如果说我们还想知道这个window相关的信息的话。Table API也提供了一些方法,就是我可以直接获取到当前,因为我这不是定义出来了吗?我可以获取到当前window的信息,比方说我看一看这个window到底是什么时候关闭的,对吧?我看看这是哪个窗口关了,然后输出了一个count呢?可以window点,大家看有and对吧,因为之前我们说window里边不是有那个起始点,有start点,也有中结束点and点,对吧?所以这里边你可以给那个start,也可以给and,哎,我这里边因为要看它什么时候关嘛,给and方便一点。
11:08
啊,这样的话就可以看到这个信息了。啊,然后下边我们再做这个,呃,可以做一个打印输出对吧。这里边我们就是打印输出啊,打印输出好,前面这个我们就。直接注掉放在那里啊,那个是STEM码表的架构啊,这里边我们把这个result table。转换成硫啊,那那这个大家知道是先要转换成to a pen stream,这里面大家注意啊,我可以to aend stream吗。呃,因为这里边我们做了这个,呃,聚合操作,诶大家可能觉得,哎呀,这个看起来好像不能to a pan stream对吧,因为聚合了嘛,因为这个聚合可能它里边有更新嘛,所以这里边我先做这个啊,To strip。然后里边的类型,呃,我就直接写肉吧。这个最简单,根本不用考虑它到底是什么东西啊啊,这里边不要等于啊,直接打印输出啊print。
12:05
当前这个就是我直接写一个AJ对吧?啊,当前的这个结果运行一下,看看这个效果怎么样吧。好,大家看这个代码已经运行结束了,诶我们现在得到了几条这个AJJ这个输出的结果啊,前面都是处诶大家看这个相当于是什么呢。都是说明没有出现更新对吧。诶给大家提个问题,那这里边为什么没有出现更新呢。我们这里面既然用到了这个聚合,聚合它怎么不更新呢。啊,这就是这个特点,我如果说我们是直接啊,你分组之后直接聚合的那种操作的话,没有窗口的话,诶,那这个比较简单,那就是说来一个之前结果上聚合一次,来一个聚合一次,对吧,所以他会在之前那个结果上不停的更改,不停的更新,而我们现在是什么呢?现在是开了窗口。而且我们这个滚动窗口对吧,每一个窗口内,你想它最后输出这个窗口,是不是只有在窗口关闭的时候才输出一个结果呀,而且这一个结果你想我们这里边又没有,呃,像之前那个传说中的那个更新结果的话,那是窗口处理迟到数据,现在又没有这个处理迟到数据,那就相当于只是每一个窗口就针对每一个34ID来输出一个结果。
13:26
这个结果再也不更新了,所以当然这里边就全是处啊,所以大家看这里面其实我们都没必要用tract啊,直接用aend都可以。啊,后面我们就是再给大家试的时候,呃,大家就看到你这个一般的这个窗口啊,直接给那个to a stream也是没问题的啊。那这里边就会得到这个,大家看到这个结果,呃,就是我们第一个。大家看这个关闭的是20秒啊,09:43:20,那这个20秒到底是什么时间点呢。啊,其实如果大家要是知道这个之前我们数据的话,大家肯定知道十秒一个窗口嘛,整十的,哎,作为这个就是十十秒的整倍数作为窗口的起始点,那当然也就是关闭点了,所以第一个关闭的窗口当然就是200那个时间戳对吧?啊所以这里边其实是那个43分20秒这个时间点,所以大家看它这个关闭的时候统计个数是几呢?哎,341只有341有统计个数是一个对吧?199这个数据有一个。
14:30
然后后边这个30,哎,那大家知道就是下一个十秒了,就是200~210,哎,大家看下面数据都在200~210,那它输出了几条呢?输出了四条数据啊,为什么?因为我当前group by这个ID嘛啊,所以四个ID当然是输出四条数据,这里边341哎有两条数据大家看。因为总共是三条数据,但是我现在只统计这个窗口啊,这十秒之内200~210,那是只有下面两条数据啊,所以输出一个二对吧,后面这个三四六三四七三四十只输出一条数据啊,这只统计了一条数据,这就是这个结果。
15:11
啊,所以大家看这个就是我们也可以直接用一个APA stream直接把它输出,前面我们讲到那个呃,更新模式的时候,大家也发现,诶,你实际应用的时候,那你像这个卡夫卡啊,对吧,或者说其他的一些情况,它根本就不支持我们的那个upsur mode,或者说是这个mode,那这个你是不是就呃应用受到了很多局限呢?啊,它是受到了局限,但是一般情况我们做的这个统计输出其实都是基于窗口的,对吧,很少有那种说我就是来一个数据聚合一次,来一个数据聚聚合一次,我永远要实时的更新它那个结果,那大部分我们要看的是你实时的输出一个当前窗口。对吧,刚刚要关闭的这个窗口的数据也就够了,所以对于这种需求而言,其实APA stream是没有什么没有任何问题的,对吧,直接用一个追加模式啊,Upon mode就可以把这种我们的需求搞定了啊。
16:07
啊,这就是这个,呃,在这个代码当中对于group window的一个具体实现。
我来说两句