00:00
我们已经知道了在table API和flink CQ里边分组窗口group window到底怎么样使用,那接下来我们在代码里边做一个实践啊,同样我们还是在table API下边创建一个object啊,这个不用单独去创建了,因为之前我们有这个window and太慢的window test嘛,我们就直接在里边用好了,对吧?呃,这里边首先我们就基于之前已经定义好的环境和已经呃定义好的这个时间,时间特性是事件时间,然后呢,后面我我们还分配了这个时间说和water mark指定了时间字段,当前这个time stamp就是当前的这个时间字段,对吧?呃,这里边我稍微注意一点,大家要稍微注意一点小细节,就是说最好后边我们的这个,因为当前我们这个表里边的字段,你看到它就叫time stamp了对吧,这个其实不是特别好,因为time stamp本身是flink里边的,呃,就是CQ里边的一个关键字,对吧?啊,所以如果说这里边。
01:00
那我们想要给他做这个后面啊,想要在这个CQ里边去做更多的操作的话,那最好还是要把这个做一个别名,给一个别名啊,那这个大家知道别名的话,定义了这个row time之后,也可以继续在as对吧?呃,你看同样还可以在as,比方说我们给一个TS啊,这样就给它做了一个重新的定义啊,接下来我们在表里边注册的这一个stemma里边的这个字段名称就变成了TS啊,就不会那么奇怪了是吧?好,那接下来我们就来看一看怎么样去做这个操作啊,我们先把这个先注掉啊,这个打印输出就先不管了,接下来我们做这个窗口测试,首先是呃分组窗口group window,呃,首先我们来看这个table API的调用啊,那这里面当然我就希望得到一个就是转换之后的一个一个结果。
02:00
表对吧?Result table基于当前的sensor table去做一个操作,那么当前的这个sensor table首先做什么操作呢?呃,比方说我们想一个具体的实际应用吧,对吧?呃,就比方说现在我们输入这个传感器的数据啊,你像之前我们就是基于它可以做各种各样的这个开窗聚合统计的一些操作嘛,那现在我们就做一个最简单的操作,就像之前我们做的那个聚合一样,就是当前开一个十秒钟的窗口,然后统计这十秒钟之内呢?呃,按照ID去做分组,每一个ID的那个传感器到底有多少个温度值进来过,对吧?还是一个count统计一个count值啊,或者大家想要去统计,呃,对应的我们,呃,那个比方说平均温度值对吧?啊,这些东西都是可以去做统计的啊,我们这个就不给大家做详细的介绍了啊,那这里边我就可以去首先先把这个window定义出来,对吧,大家看这个三这个table啊,34TABLE。
03:00
直接就可以调window方法,然后调window方法的时候,它有两种实现,一种是里边传这个over window,另外一种是里边传一个group window对吧?啊,这两种是呃,重写了的这呃就是参数重载了这样的方法啊,然后得到的数据结构呢,一个是由table变成了一个group的window的table啊,那首先我们来看这个啊,然后得到的group的window table group window的table对吧,稍微有点别扭啊,Group的window table它里边能做的操作呢,大家看有一个group by对吧?接下来它就是先做了window之后,然后我再公办,因为我当前本身这个窗口也是分组的一个标准,因为本身CQ里边并没有我们之前说的这个,呃,数据流里边的那个窗口桶的那个概念,对吧?哎,它所有的这个只有分组的这个概念嘛,所以说当前我这个要想做不同的这个桶里边的这个。
04:00
聚合的话,还是用这个分组的方式把它做一个分配啊,所以这里边接下来才调group by,大家可以看到这里边能调的就一个group by,对吧,只能调这么一个方法,然后得到的呢,是一个window group table,稍微有点奇怪啊,之前这个叫group window的table啊,就是呃,相当于本质上我还是一个window的table,开窗的table对吧?然后现在经过group by之后呢,本质上这是一个分组的table了,对吧?呃,是window group的table,靠把靠这个window窗口,然后把它做了分组之后的这样一个table,然后接下来,接下来我们再得到的这个table,你就看到了,它可以去调select的方法,对吧?我们说select里边为什么直接提取可以呢?那你说的那个窗口函数在哪里呢?因为它里边可以直接指定调用一些列的聚合函数,对吧,直接把那个聚合出来,所以直接select是可以的,另外还有就是可以aggregate,可。
05:00
给flat aggregate对吧?这里边就是直接指定去做聚合,做这个窗口的聚合操作,相当于我们的那个窗口函数,那最终得到的就是一个,诶,这里边sle得到就是一个table了,直接已经提提取出来了嘛,字段都提取出来了,那你如果要是aggreg的话,这得到的呢,还是一个aggregated的table对吧,这是聚合之后的那个状态,然后这里边它再去调select方法,才能得到我们最后的那个table对吧?哎,这就是又是我们说的兜兜转转绕一圈,最后table API最终还是以table为准对吧?啊,绕一圈最后再回来啊,所以这个接下来的这个调用的这个方式大家就发现了,没什么好说的,必须照着这个流程走,一步一步就是这样对吧?啊,所以大家就是练习惯了就可以了啊,那首先我们想这个每十秒钟去做一个统计,那这个需求不就是一个滚动窗口嘛,对吧?啊,所以我们每每十秒。
06:00
好,统计一次,这是一个滚十秒的滚动时间窗口,所以接下来我们在这里面定义的时候,怎么去定义呢?诶接下来我直接传一个,诶大家看这个类对吧?Flink table API里边给我们实现的tbo这个类,它本身大家看它是一个final class对吧?啊这这里边它接下来得调方法呀,得调什么呢?哎,调一个over对吧,你看它接下来这个操作,这就是一步一步的啊,接下来调over,然后over呢,得到一个tumble with size,然后接下来呢,Tumble with size又必须去调一个,哎,你看调一个on方法对吧?这个on方法呢,这就是说指定那个时间字段吧,得到一个tumble with size on time啊,然后又是这样一个类啊,这样一个类里边调用的方法呢?啊,接下来就是必须得调S对吧?
07:00
所以大家看这一步一步也都是必须的,没没有什么好方法,你必须按照这个流程一步一步掉下来,最后呃,这个as得到了这个ble with size on,呃,Time with alis别名这个东西之后,它就是一个group window了,对吧,大家看这么这么一长串是这样的,所以接下来的这一串调用也没有什么好好讲的,对吧,必须是他先去调over,然后over完了之后,呃,这这里边你还没有对应的那个字段,对吧,完了再去调这个on on完了之后再去调S,必须是这么一层一层掉下来啊,这里边大家知道在scalela里边啊,你如果要是单一参数传递的话,其实是可以省略这个点,就是我们这里边,呃,调用这个调用方法的时候,其实可以省略这个点和括号,对吧,直接用空格分分割,看起来就好像是一句话一样,哎,所以我们经常就用这种方式给大家去做。
08:00
这个展示对吧?啊,这个看起来就非常的直观,就是tumbo over over这个,呃,多多长的这个时间作为这个窗口长度呢,十秒钟嘛,这里边我传一个十点second对吧,传一个这个,然后后边呢,哦,当前的时间字段TS对吧,我们做过别名的嘛,TS,然后另外S我定义一个W对吧?诶把它定义出来就完事了,这个其实非常简单的一个定义方式啊,那后边这个也是我们说必须得做group by对吧,没什么好想的啊,所以接下来就是当前的这个别名,这个window必须作为我们定义的这个,呃,窗口的这个就是作为分组的一个标准传进去,那另外呢,还应该指定一个真正的那个ID作为字段对吧?呃,就是ID字段作为当前的那个K,真正的K啊,所以这里边把这两个传进来就完了,如果大家要区分的话,比方说我们这个叫做TW,就是滚动窗口嘛,对吧,它。
09:00
Window把这个给一个这样的别名都是可以的,那最后再做一个select对吧?啊,我们想要的东西拿出来就完事了,比方说我想要ID,然后呢,呃,我想要一个当前的这个ID的count对吧?哎,调这个方法直接统计个手就可以了,另外我可能还还希望想要什么呢?呃,这里边比方说我想要当前这个temperature的,诶,我可以调它的这个average对吧?诶,我算一个这个平均温度,这也是完全可以的呀,完全没有问题对吧?呃,另外还可以输出什么呢?因为你直接这么输出的话,我不知道这当前这是哪个窗口啊,它还可以调用当前窗口的一些信息,因为窗口不是有别名吗?这个TW是完全可以放在这儿的,然后这里边我可以直接调它的一个end方法,也就是说这这这是基于这个窗口的啊,就是如果说是一个窗口的话,我直接点end就表示什么呢?当前。
10:00
窗口的结束时间对吧?那就是我们一般理解的那个关窗时间,假如说不考虑那个呃等待迟到数据的话,就是它的那个关窗时间,这样的话我一看就看到诶第几秒钟关的这个窗口,这里边来了一个数,哎,它是什么样对吧?哎,这这个最后的这个聚合结果是什么样的,这就非常的直观啊,这就是关于这一个group window的一个定义啊,当然下面我们也可以把它做一个CQ的实现,大家可能会觉得这个呃,Table API的这这种实现还挺奇怪的是吧?哎,这你要是简单来想的话,这不就是呃就就类似于我们直接做那个select这些对吧,然后你把那个呃,如果要把这个窗口定义好之后,然后我们直接调这个函数,Count函数对吧,Average函数直接调不就完事了吗?哎,这个其实是比较简单的,所以接下来,呃,我们如果要用CQ的话,大家还记得必须要先把这个注册一下,对吧?当前这张表先注册一下,所以create temporary type view注册。
11:00
的sensor,然后把sensor table传进来,好,然后接下来这里边我们就得到一个啊,Result c table还是基于不是于啊,基于这个table en nv对吧,我们要去直接调它的CQ query方法,里边写一个CQ对吧,写一个字符串进去了,那接下来我们就直接写了select啊,接下来我们select什么呢?这个select还有点多,我直接分开写吧,那首先是I对吧?啊,然后这里边我们还要这个。要把这个拿到对吧,那那接下来还要这个这个要拿到啊。另外啊,接下来我们还需要要一个那个window的end对吧?啊window end这个这个又怎么怎么调呢?难道直接是一个end的方法吗?这好像有点奇怪是吧?哎,我们先空在这儿,先不着急啊,先把这个空在空空这儿啊,然后后边我们就是from sensor诶再接下来啊,那就是group by对吧?分组的标准,分组的标标准就来了,一个是ID,另外一个是什么呢?不就是我们那个window的定义吗?啊window的定义当当时给大家说了这个定义的时候,是不是就直接用这个函数对吧?Tbo,这就是滚动窗口,Top就是滑动窗口对吧?呃,直接用这个定义就完事了,所以这里边我们直接给大家来一个tbo里边,诶,首先这是这个时间属性放在第一个位置对吧?后边是一个窗口,长度是一个interval,大家还记得之前我们说那个DDL里边表示我们那个interval的时候怎么表示吗?时间长度的时候怎么表示吗?Watermark的时候。
12:49
讲到auto这个延迟一秒的时候,我们当时用到过对吧?哎,这这里边它要有一个in inter关键字放在这儿,然后后面呢,跟上一个字符串引起来的数字,再加上当前时间的这个单位,比方说INTERVAL10秒,对吧?啊,这就是这个窗口的定义,那前面我们这个窗口的and怎么样去把它拿到呢?那同样还是直接把这个放在这儿,有一个方法就叫做tumble and,你直接这么调用,这就是我们当前定义的这个窗口的一个at,哎,那有些同学看到,哎,你这个没有给别名啊,没没给别名没关系,对吧,只要这里边给的这个字段和我们这个长度的定义是一样的,这就是同一个窗口,对吧?因为你这个调的这是不同的,不同的这个,呃,当当前这里边调的是这个不同的函数啊,你这个东西没有办法给别别人对吧?啊,所以直接这么做就没有问题了,好,那接下来我们还是把这个输出来看一看这个效果吧,打印输出。
13:49
转换成那首先我们把这个,哎,诶这里面就有一个问题了,大家想一下是啊还是呢。
14:08
哦,那那这里边大家可能会想到有可能,呃,应该是retract吧,觉得你这里面都做聚合了,对吧?哎,可能是retract的可能性比较大,呃,那如果要是说啊判的话行不行呢?我们先写出来啊,先试一试,测一测,里边这个我就不详细写了,直接给一个肉好了,然后print输出,这里边给一个,然后同样呃,Table,诶这里边我给一个,大家知道这俩其实效果应该是一样的,对吧?呃,得到的这个结果应该是一样的,我这里边给一个CQ,把它做一个输出,好,那接下来我们直接运行一下代码,现在我们是从哦从文件里面读取的,那应该直接运行可以看到结果啊,大家看到好像没有报错,直接运行结束啊,是成功的得到了这个运行的结果退出了啊,我们来具体看一下到底是怎么回事,我们看一下啊,呃,首先我们看这个啊,大家看这里边,因为两条流分别去做输出这个。
15:09
顺序就没准对吧,这中间是穿插的一个状态,我们先看这个result,这个table输出的这个结果啊,我们这里输出的结果就是每一条数据,诶大家看到这个CEN41这个来了之后,首先是什么呢?20秒诶大家看这就是我们打印这个窗口结束时间的一个一个好处了,我就知道当前它就是在这个09:43:20的时候关了一个窗口,那当然也就是说我们第一个这个199这个数据,它应该是,呃,大家其实能想到啊,它应该应该就是四十四十三分19秒的一个数据,对吧?啊,所以20秒的时候呢,只有他一个3411统计了一个,然后到30秒的时候,你看接下来就就是隔了十秒之后,下一个滚动窗口突然就输出了好几个数据,看346 341,三四十,还有347啊,尽管它放在后边了,但是其实我们知道它也是30秒的时候统计输出的结果,对吧?哎,你看这些都是输出了,然后呢,341有两个。
16:09
3463,呃,其他都有一个,我们知道只输出了,只输入了一个一条数据嘛,当然就只有一个了,那这里我们看到这个30秒的时候,那就应该是200~210对吧?诶我们看一下,果然206208,这不是两条数据吗?210我们说前壁后开属于下一个窗口了,哎,所以210就放到了下面这个40秒的这个输出里边,大家看最后我们在做这个40秒统计的时候,还有一个二对吧,3412放到了这儿,这个是没有问题的,那我们再来看一下,呃呃,就是大家看到这里边我直接to a判输出,大家发现这个没有问题对吧,就来一条输出一条看起来一点问题都没有,那接下来我们看一下这个输出,CQ那边输出输出是什么样子,他其实还是一样,20秒结束的时候,只有341有一条数据统计对吧,30秒的时候呢,三四六三四一三四十三四七,诶都输出了,只有341是两条数据,别的都是一条数。
17:09
最后还有就是哎,这个40秒的时候,341有两条数据结果是不是完全一样啊,而且我们会看到你这个tract前面不是表示它到底是插入还是更改吗?大家看到是不是全是处全是插入操作啊。啊,所以这里边就是我们totra stream肯定是没问题的,对吧?这里面肯定是没毛病的啊,就肯定是不会有错,但是我们会发现呢,对于当前的窗口定义是不是它其实就是每一个窗口到点的时候输出一次结果啊,针对每一个S啊,啊针对每一个S4ID对吧?输出一个结果它会改吗?它不会改对不对?那这里面什么情况才会改,就是我们前面说的那个,如果你要后面定义了,定义了一个,呃,允许处理迟到数据,在之前聚合结果上要做更新的时候,那才有可能会改,对吧?所以这里边你会发现它只输出一次,那即使做了聚合操作,做了这些统计,Count average之类的这些数据,那那最后得到的结果也只是一个就是A追加就可以搞定的,这样的一个理由对吧,啊,这个大家就理解会对这个更加的深刻一些啊,所以刚才其实大家,呃,就是大家认为的这个整个处理的这个过程。
18:27
程都是对的啊,就是你说是APA stream还是retra stream,其实都是正确的啊,只不过就是大家需要知道,在这里边只有追加操作,你用retra stream呢也可以,那就里边都是追加,都是true,这样就得到结果了。
我来说两句