00:00
他已经知道了。在。Table API里边怎么样去定义group window和over window啊,那如果说我们要想直接写CQ呢,所以我们再来看一下另外一种用法,就是在CQ里边直接去定义group window和over window,其实这个也特别简单啊,就是CQ里边呢,Flink CQ给我们提供了特殊的一些方法啊,叫什么呢?就叫做tumble,然后后边传参,传参的时候传一个time attribute啊,这个就是我们的那个时间字段嘛,然后后面呢,再给这个再给一个internal字段,这就是我们那个窗口长度嘛。滚动窗口,定义一个窗口长度就完事了,所以就像我们前面在做那个table API调用的时候,大家还记得在这个table API里边,我们用到的是。直接tumble over什么on什么,对吧,相当于给了两个参数,一个时间字段,一个当前窗口的大小size,那这里边呢?呃,这就更加的直白,就不要用那样的定义了,直接一个相当于CQ里边的一个函数tmbo,就叫tbo,然后里边传两个参数就完事了,定义出来了。
01:09
啊,那同样呢,还有滚,呃,这个滑动窗口,滑动窗口稍微特殊一点,它的这个定义叫hop啊,啊,这个为什么不叫s slide,那那应该就是我们这里边有一些历史原因对吧,就是可能阿里一开始它自己定义就定义成这样了,后面啊,或者说就是我们一开始这个table API去做这个定义的时候啊,本身就就是用了这样的一个方法的定义,所以就没有没有用大家统一的那个名称啊,这里边叫做hop hop,它里边的这个字段传什么呢?多传了一个interval对吧?啊所以这个也很好理解,你还是要指定当前的时间字段,然后呢,里边传一个当前的,呃,这个窗口的大小,传一个滑动步长不就完了吗?所以多了一个时间的间隔。这里大家需要注意的是,这个时间间隔呢,是前面这个是不长,后边这个是我们的窗口大小啊,所以大家不要搞混了啊,就是前面这个小,正常来讲啊,应该是前面这个小,后面这个大。
02:10
啊,就是如果大家要是搞混的话,就会发现,诶,那个窗口输出的那个结果很奇怪啊,就是好像跟我们预计的那个聚合不一样啊,所以这个大家稍微注意一下,然后另外还有就是session了啊,这都一样啊,里边传这个时间字段,另外还有个interval interval不就是那个间隔嘛,对吧,With的那个参数嘛,这个都是非常类似的啊。呃,另外还有就是呃,有了这个group window,大家再看看这个over window over window的话啊,就关于我们这个group window,具体在这个。实际CQ里边的写法啊,这个其实就后面我们在代码实现里面给大家看一眼,大家一看就知道啊,它操作就非常简单,就是说我直接在后边把这个window定义出来就可以了啊,就比方说我我要怎么定义呢?呃就是类似于就是说直接在后边,呃比方说slap什么呃字段对吧,然后呃,Count什么字段,然后呢,对应的还要把那个window定义出来,定义window的时候就是用这个对吧,就是tumble或者ho或者session直接把它定义,然后后边呢,Group by的时候再把这个window呃定义出来就完事了,对吧?啊当然select的时候,你可能需要这个window相关的操作,呃相关的那个字段,比方说我们一开始的这个例子里边有这个and对吧,你要是不需要的话,这个那个定义的时候不要也可以。
03:30
啊,那关键就是在后面那个group by的时候,要把这个窗口定义出来啊,然后可能还想给他一个别名也是可以的,对吧,但是一般情况下可能就不给别名了啊,因为呃,你给了别名的话,前面其实也没有办法直接调它的方法啊,所以就是说一般就是在后边直接把这个按照这个group by后边跟上这个窗口定义的这个字段就可以了。然后另外还有一种是over window over window的使用,大家可以看一下这个select的这个例子啊,啊,那我们这里边写CQ的时候,跟实际在就是具体那个写写C口的过程当中啊,这种方式是一样的啊这就是什么呢?前面就是我们要统计的这个字段啊,然后比方说我count amount统计一个数量,然后呢,Over对吧,定义这个。
04:18
呃,开窗函数over后面这个窗口,那这个over怎么样具体定义呢。这就是我们刚才说的那几个字段,那几个呃要求的这个属性啊,一个是position by position by,比方说user,对吧,按按照ID来做这个聚合,然后呢,呃,就是我们这order by对吧,按照当前的这个处理时间的字段去做一个排序,然后后边呢,定义范围啊,主要就是定义这个分区的这个K,然后按定义这个排序的K,然后最后定义这个范围,范围的话,那就是有这个行数的限制,也有可能有这个时间的限制,对吧,比方说你是这个时间的话,行数就是呃,Rose between。
05:01
如果是时间的话,那就是range between对吧,这个都是大家应该比较清楚的东西啊,啊,那这里面比方说我定义的是roses between two proceeding and current row啊,就像我们刚才定义的那个一样啊,就是当前行之前的。两行对吧,这样的三条数据聚合在一起,得到一个结果。啊,那我们这里边做这个窗口聚合定义的时候呢,就是你前面我们这里定义的所有聚合操作都必须基于同一个窗口,也就是这里边我可以。这个呃,定定义这个count amount,对吧,也有可能我还有别的字段也要做一个聚合,比方说做一个这个呃,这个温度值的一个avg,对吧,就做一个这个那个average,做一个平均值的计算,这个都是可以的,那我都必须要基于同一个over窗口,它有同样的分区,呃,排序和范围。啊,这样的话就可以把它定义出来了啊,目前是只支持在当前行之前的,之后的现在还不支持啊,以后也有可能会加进来,大家可以想到你要加进来这个处理的话,那就相当于这个又变成类似于批处理,又要多等了嘛,对吧?啊就是你如果要是你当前定义了这个之后多少行聚合的话,那当前数据来了之后,你得一直等着,等到后面对应的那个多少行来了之后,我才能把它做一个输出啊,类似于这样的一个机制,现在还没有实现啊。
06:30
好,那我们在代码里面给大家做一个。具体实现吧。呃,那接下来写一个第三步啊,或者我们前面这样写,前面这一部分是table API对吧。Table API。实现。啊,那下面这个就应该是1.1啊。然后下面这个是1.2。后边第二部分。我们这个是呃,CQ实现啊。
07:05
2.1,哎,这个还是group window。Group Windows,哎,我们看一下,呃,同样在这里定义一个result table啊,我定义这个叫做result啊,就叫就叫c table吧,像我们之前那种定义啊,同样还是一个table。基于之前的sensor。开窗对吧?呃,首先是,呃呃不是啊,我我们这个现在是要写C口,当然就不是基于他直接开窗了,我们首先应该是。就是要注册一个表对吧?啊,要把之前的那个sensor table注册到当前环境里边,大家记得这一步啊,我这个就直接env啊,注册表的时候,之前我们讲过啊,有这个register table,现在推荐用的方法是create temporary啊,接下来把这个sensor我就叫sensor吧。
08:01
Sens注册进来。然后接下来做操作的时候,那就是table env,直接调CQ query去写CQ了,对吧?哎,我这里直接换行来写啊,大家看一下这个写法,Select,呃,当前是ID对吧?然后我们对应的是要有一个count嘛,对吧。然后这个直接就是count ID本身CQ自带的函数啊,这也是这个当前CQ都支持的啊,这个简单的函数肯定是支持的,然后我定义什么呢?呃,还需要这个,呃,假如说你这里需要这个average的话啊,那我就直接这个还是啊avg对吧,直接把它算出来就可以了,这里边给大家讲一个什么,假如说我还需要当前那个window的,那大家看前面我们这个没有做那个temperature的avg对吧?我们要的是那个window的and,那这里边我怎么取这个window and呢?哎,这里边我要。
09:00
用这种方式啊,就是我们定义那个呃,滚动窗口的时候,不是用了那个tumbo这个类这个状态嘛,对吧?啊,那这个tbo这个函数啊,这个方法那里边我们要传的是时间字段TS,然后还有一个窗口长度是个interval,那么这里边interval定义出来,比方说这里边十秒钟。哎,注意啊,这里是十要用引号引起来,后边second对吧,有一个这个间隔的这个定义哈,然后这里边如果说我们要把它的那个window and拿出来的话,直接用这个啊,有一个方法就叫temple and。啊,那同样就是说你如果要取它的那个起始点的话,他start对吧?啊可以这么去取,或者那个你如果要是那个滑动窗口,大家可能记得滑动窗口怎么定义的ho对吧,你如果要滑动窗口的话,这里边就是ho这么去定义。那ho的话,里边这个就得多定一个参数了啊啊,那这里边就得interval,大家知道第二个参数是滑动步长,比方说我滑动五秒对吧,我或者我滑动这个三秒,滑动这个四秒这个都可以对吧,随便定义一个啊second。
10:10
然后后边这个滑动的。划划窗的长度,十秒钟,这样把它定义出来啊,然后那select当然是from了,From sensor,然后下边,哎,Grew by对吧?Grew by这个不能少啊,BY什么呢?一个是ID,前面我们的主要的这个K,另外一个是啊,就是窗口对吧,就是前面这个,你如果要不想看它的这个窗口信息的话,这个可以不要。但是下边这里边必须得有窗口的定义go by啊,那这里边我要的就不是and了,对吧,就是这个ho的定义啊,那那这里边我如果要的话,就把这个再加进来,这个都是可以的啊,就按这种方式把它定义就完事了。啊,然后下边我们来做一个这个输出的显示啊,当前这个叫result table,我们可以把它to,大家知道这个转换之后它都都不做更新吧,所以说直接用这个pad这个是没毛病的啊,这里边直接AJJCQ对吧。
11:11
好,我们来运行一下,看看结果怎么样。现在这个已经运行完了,大家看一下这个输出的内容,哎,这个输出东西就很多了啊,因为我们现在是滑窗嘛,对吧,四秒钟滑动一个窗口,哎,那大家推测一下这个我们这里边哎,这里边其实你看到这个,因为我们输出了这个结束时间嘛。二二对吧,二二结束第一个窗口啊,那自然我们就能想到,肯定是因为这个时间戳二,呃,如果要是这个二二的话,呃,这个结束的话,他这个就是我我们前面的那个结果应该是能够被我们的呃这个四秒钟这个距离整除对吧?啊,应该是这样的一个一个状态啊啊那这里边诶我们这里边是几秒钟不是四秒钟是吧?因为二二好像不能被四整除啊。
12:00
看一下这个定义的,诶是四秒钟,那这里边我们第一个关闭的窗口是,哦,这里面关键是我们得去找到第一,就是我们这里边算出来的第一个窗口可能未必是。就是当前真正关闭的第一个窗口。哎,这这这是为什么呢?就是说呃,我们当时算的时候是按照这个呃滚动,大家还记得那个窗口的起始点对吧,那个算出来的话,就必须得是按照当前的那个长度啊,就是定义一个那个整除的一个数来去做计算的,那这里边比如说我们有了这个滑动不长的话,它最后得到的那个结果其实应该是什么呢?应该是当前滑动不长的一个整数倍。啊,比如说这里边呃,就是在在这里边我们这个呃做这样的一个输出啊,其实应该是什么,每一个数据。应该是,呃,在当前的这个窗口里边,那应该是第一个,第一个数据是九九对吧,大家记得这个数据九九,所以如果能被四整除的话,我们这里边算出来,大家知道最后一个算出来是那个。
13:13
呃,就是。呃,就是应该是那个200对吧,200这个应该是能够算到一个数据,然后后边我们这里边,诶这个稍微有一点奇怪,对吧,因为二零如果是。200作为这个起始点的话,呃,就是200是第一个关闭窗口的话,那我们呃,应该起始点它应该是196对吧,应该是16秒有一个有一个窗口,但这里面它第一个关的是二二,也就证明我们第一个窗口相当于是。998对吧,就是198到。呃,到这个,呃二。呃,这个二二哦,呃这个这个呃不重要啊,这个肯定不是这个200对吧,因为我刚才是理解成这个200了,所以这个二二的话,呃,大家如果按照之前我们的那个应该是199,相当于是。
14:11
呃,就是应该是哪哪个数对应过来应该是19秒那个数,所以这里边的二二其实对应的应该是。应该是那个202那个数据对吧,应该是202啊,那这里边如果是202的话,它的窗口应该还是198~202的一个窗口。呃,这里边我们先看一下这个数据对不对啊,最后的这个结果对不对,那这里边是341,我们看这里边count是一对吧,然后得到这个时间,它是这个二二关闭的一个窗口,所以198~202这个时间段内啊,当然就是只有一个数据了啊,只有这个1991个数据对吧。然后下一个。哦,我们这个还是还是这个想错了啊呃,刚才我们说了半天,其实还把它当成这个滚动窗口来定义了,不是啊,我们现在这个窗口是滑动窗口,我们当前这个滑动窗口是十秒钟的窗口,所以最后一个就是它的结束时间是二二的话。
15:17
我们当前是这个,呃,就是呃应该叫什么,是202结束的话,它的起始时间应该是192对吧。啊,所以大家如果要是一除的话,这个192应该是一个一个能被四整除的秒数对不对,因为它是相当于是12秒到22秒的这个这个窗口嘛,啊,所以我们说是窗口的起始点应该能被这个间隔啊四秒钟整除啊,所以这个就绕回来了啊,刚才一开始我们还是想错了,所以这里边其实是第一个窗口是12秒到22秒啊,所以当然呃,就是12秒应该是什么时候,应该就是192对吧,192~202,那当然还是只有他一个。这个是没问题的。
16:01
然后是滑动窗口,下一个我这里写出来啊,192~202第一个窗口。然后下一个窗口,那就应该是滑四秒,应该是196~206对吧,所以大家看它变成26秒关闭了嘛,那196~206有有多少数据呢?诶这里大家注意,这个我们第一个窗口里边346也有一条数据,因为你到202嘛,所以这个201也是算的,但是347没有数据,我们这个左臂右开347,这个202不算对吧?啊,所以接下来这个数据是属于下一个窗口196~206,我们统计的时候3EN46,诶看到了三六还是一条,对吧?因为这个也属于196~206嘛,341呢也是一条,因为199也属于它,那另外除了这个3EN4133EN4这个346之外,这个SENS40和3EN47,他们也都属于196~206对吧?啊,所以这些数据都输出了一。
17:01
那后边我们再下一个窗口,那就得是。200~210对吧,所以大家看到下面这个30这个窗口还在输出10,呃,一六七十这个都在输出,但是呢。这个76这三个ID3它们就都是一,那我们这里一的这个ID对应的这个数据就是多少个呢,就变成了三了,为什么呢?它后面你这不是200~210吗?所以这个207208209都统计进来了,对吧?啊这个就是非常符合我们的预期,那同样你再往后面去去移的时候,到下一个窗口啊,大家知道34,那这这应该是204~214,那当然这些还都是。都是有的都符合对吧,再下一个窗口,那可能那是208~218,它就只有两个数据了。啊,所以大家下来之后可以多测一测啊,这滑动窗口就是会因为涉及到这个滑动嘛,稍微的会会比较绕一些啊呃,大家只要是测试的时候找到它这个点就可以了。
18:07
哦,另外还有一个,呃,那个over窗口的一个实现,我们这里也简单的给大家敲一下啊,代码里边写一下啊,这里边还有一个over window。啊,CQ的实现对吧,啊,这个CQ的实现,其实这个大家一看就能就能看到,这个非常简单,这个我们叫all CQ table吧。同样,这是一个table类型。这个就不写都可以啊。我们还是啊,Table env,直接点CQ query。把这个对应的。字符串CQ写进去,同样select,当前是ID啊,当前我们要什么来着,要那个TS对吧,大家记得要这个TS,然后还要它的count和temperature的,呃,这个平均值avg对吧?所以这个其实非常简单啊,后边就是count ID。
19:04
诶注意这里有over对吧,Over后面我们这里给一个别名了,给一个比方说就要W吧,然后还要avg temperature。Over。W啊,然后接下来from sensor已经注册过了吗?所以from sensor对吧?啊,那接下来是怎么样呢?得有一个得有一个开窗函数对吧?呃,这里边我们得在后面定义啊,定义一个window w as。诶,这里边我们把它定义出来啊,这里边它本身需要呃,三个这个属性的定义,一个是分区字段,一个是排序字段,还有一个范围啊分区partition by啊,当前我们是by ID啊。然后呢,Order by order by空格啊。我们当前是BYTS对吧,时间字段,然后最后呢,啊还有就是。
20:01
呃,当前行之前两行范围内做聚合,哎,那我们这个Rose between。退,哎,我们知道是to proceeding对吧,Ing。And current role,诶,这就是这个完整的定义啊,但是大家如果要是写的更规范一些,看的更明确一点的话,这个呃,方法这个关键字都写成那个大写对吧?这个by order by,这个road Rose between,这都写成大写,看的会更清晰一些。好,我们来执行一下这个,看看结果。呃,这个是all。对吧,我们这里边叫order啊。这里再来执行一下。看一下这里边运行结果已经出来,诶大家看到这个跟我们之前,呃,就是直接写table API的那个输出是不是一模一样啊啊对吧,这个完全没有问题啊,一开始的时候只有一条数据,后面的话变成了两条,求一个平均数,对吧,然后后面变成了三条,最多三条啊,每来一条数据更新一次,就是这样的一个过程。
21:11
大家下来之后可以把这个代码好好的实现一下。
我来说两句