00:00
我们已经了解了link CQ当中的分组聚合和窗口聚合啊,那我们会说分组聚合呢,这是CQ当中的标准聚合方式,而窗口聚合可以说是flink进行流处理里边的一种特色的聚合方式。接下来呢,我们还要介绍另外一个比较特殊的聚合啊,那这也是在标准CQ当中就已经提供了的一种语法,那就是所谓的over聚合,或者叫开窗聚合,它所针对的情况呢,比较特殊,哎,他干的是这样一件事,我们针对这一个表里边的每一行数据。像之前的分组聚合,或者说窗口聚合呢,都是以每个K。作为一个分组选择的标准,然后呢,选取出对应的很多条数据啊,不管是按窗口的啊,还是不按窗口的,都是选取出很多条数据,然后统计出唯一的一个值,所以之前我们都可以看出是一个多对一的转换操作得到的结果表,这张聚合表里面的数据一定很少。
01:05
而现在呢?呃,现在这个开窗聚合就是另外一种场景了,它是针对每一行数据,每一行数据都去扩展。比方说我们按照这个范围,它之上之下的一段范围内的所有数据,把这些收集起来,然后做一个聚合,统计出一个结果,然后呢,下一行同样是这样,它也是按照上下的一段范围扩展出这样的一组数据,然后去进行聚合统计,得到一个结果。所以我们会发现啊,这样的一种方式就相当于是在每一行上打开了一个窗户一样啊,这也是为什么把这种聚合叫做开窗聚合啊。它是针对每一行数据都要做类似的一个操作啊,就是把这一行作为基准,然后上下开一个窗,收集一组数据来进行一个聚合,所以我们发现这样它聚合得到结果表呢,每一行就对应着一个结果,那其实看起来就有点像是一对一的一个关系了,最终聚合表里面的结果呢,根本行数不会变少啊,那所以呃,这也就是这个开窗聚合啊,可以说跟前面我们所介绍的这个分组聚合和窗口聚合都不一样的一个地方,那标准这构里边呢,它的语法主要就是使用了一个over这样一个关键字啊,那它的写法其实就是前面我们还是使用一个聚合函数系统内置的啊,可以是count,可以是some max me,这些都可以我们去做一个聚合,那聚合基于什么聚合呢?后面来一个over over后面定义的就是当前的一个。
02:40
所谓的窗口啊,就是基于每一行扩展出来的一个窗口啊,那。也正是因为它这里边的关键字是over,所以有时候啊开窗聚合也就叫做over聚合,然后我们来介绍一下接下来它所定义的这个over窗口啊,到底有哪些具体的语法规则啊,那首先我们看到啊,这里边可以去指定的部分有这么几个,一个就是首先来一个petition报ition,我们知道有分区的意思,那这个其实呢,它就是指定了当前分区的一个,建一个K啊,理论上来讲,我们可以看它有点类似于group by做的这个分组啊,把一个字段作为K传进来之后,那接下来我们这个窗口统计的时候呢,那就是每个分区分别去统计了,这不就相当于group by吗?那这部分呢,是可选的,如果不选的话,当然就是针对所有的数据都有效。
03:34
然后接下来这个就是必须要有的了,这个是order by order我们知道有排序顺序的意思,所以order by呢,那就是基于当前我们这个行啊,扩展出这个窗口里边的所有的数据要进行一个排序,所有数据必须排好序的,那这里边排序的标准呢,呃,可以是提取某一个字段来进行排序,所以它是order by提取出来的字段。
04:00
对于现在的fli CQ啊,这里的order by后面只能跟当前的时间属性字段啊,就是别的字段是不能进行排序的啊,不能放在这里的啊,所以现在还是有比较大的限制,然后最后呢,还有一个开窗范围,开窗范围就比较简单了,就是我们说的啊,你针对当前的一条数据,一行数据,到底要开多大范围的一个窗口,而且你的选取规则是什么呢?呃,你到底是按照之前多少行,之后多少行去选取,还是说呃,之前多长时间去进行一个选取呢?这就是两种不同的方式了,所以这个开窗范围的定义也有两种方式,一种就是定义一个范围间隔。另外是定义一个行间隔,好,那这两种方式就类似于窗口里边分类的时候有时间窗口和技术窗口啊,所以这也是差不多的啊,那它的基本的定义方式范围间隔的话,前面就是一个range。指的是一个时间范围,Range后边那就是从哪里到哪里,Between。
05:03
什么什么,然后and什么什么。这里面需要注意的就是,其实这个between指定的后面跟着的就是当前范围的上下界,那这里的这个between前面这里呢,当然就是时间范围嘛,之前前到多少啊,从当前行开始往前推移啊,推移到多长时间内的这个范围,那比方说这里边我们给一个时间间隔,那是INTERVAL1HOUR啊,那就是一小时了,后面跟一个prec,这就明确指定了当前行之前。一小时范围的数据,然后and and,当然一小时,这是上界嘛,And就是下界,下界这里注意之前我们说如果是批处理的话,标准CQ里边是允许当前行指定之前和之后的数据的。现在我们是流处理啊,流处理,而且这个order by的字段只能是时间属性,那我们可以去选取之前的时间到达的数据,之后时间到达的数据,这个是获取不到的,哎,所以它的下季啊,And后面跟着的最多只能到car,就只能到当前行之前截取这一段时间的数据。
06:17
那与之对应呢,我们还可以指定一个行间隔啊,那这个时候呢,它就不是range了,不是时间范围了,而是一个Rose,直接指定到底是哪几行,后面也是between and,比方说这里BETWEEN5P,这就不需要去来一个时间间隔了,直接给一个数,那5PRE and current row,那就是当前行之前的五条数据来注意啊,这个是也包含当前行的,所以就相当于是一共六条数据,要做一个窗口的聚合。具体在CQ里面写的话,我们可以看这样一个例子啊,比方说我们这里直接就写一个select user,然后还有时间戳TS,然后呢,哎,我们统计一下当前每一个用户。
07:01
我们之前不是想要去做这个统计吗?统计一个窗口内每一个用户他访问URL的次数,那我们现在呢,不要基于固定的时间窗口,哎,去统计每一个用户了,而是怎么样呢?针对每一条数据,就是来了一个用户的这个访问数据。那就针对这条数据,他之前的一小时所有的数据,然后按照每一个用户啊,到底访问了多少次来做一个统计,这就是所说的这个开窗聚合,它的一个场景啊比较特殊,所以我们看后边呢,跟的就是count URL,然后接下来是over。Over关键字后面跟着的就是我们定义的窗口,这个窗口呢,By user,按照用户做一个分组,然后order by tsts是我们之前已经指定好的时间属性字段,后面呢,那就是range between interval 1hour,一小时之前pre and current啊,就是一直到当前,好。
08:01
把这个统计出来的SCT做一个重命名,这就是开窗函数的一个用法。啊,那接下来我们可以在代码当中也做一个具体的测试,现在我们要测试的是。开窗聚合。啊,那这个测试的过程我们还是基于之前已经定义好的啊,创建出来的这个even table啊,然后呢,呃,我们就简单一点吧,去筛选当前的UUID,然后呢,呃,筛选当前的URL时间戳TS,然后呢,呃,我们把这个TS做一个平均时间戳的统计吧,就是在当前这个用户啊来了一条数据,那么在这个用户之前。连续几次访问,比方说三次访问所有的这个访问事件,我们计算一个它的平均访问时间戳,某种意义上可以看得出来这个用户这段时间到底活不活跃啊。所以接下来我们可以直接定义一个,我们可以把它叫做overs result table。
09:03
查询转换得到的结果叫做over table啊,那么当然是基于table en去执行一个CQ query了,里边同样还是写这样的一个字符串啊,构建一个字符串就是我们想要去实现的CQ,接下来我们就是select。我们现在提取的UID,这是一个字段,然后接下来呢,比方说URL也拿出来吧,另外还有TS,最后还需要计算一个平均时间处啊,所以我们来一个avg。TS啊,然后接下来就是over。后边就是我们对于这个窗口的一个具体的定义了啊,那这个窗口呢,里边其实还是要根据当前的用户user去做一个分组的,哎,所以我们还是得定义BY。UID啊,注意在这个over窗口里边啊,它定义这些字段的时候是不需要加逗号分割的啊,我们只要这个空格就可以了,然后接下来哎,那就是order by。
10:05
这个没什么好选的,只能是时间属性字段order by ts,然后接下来就是定义我们所要选择的开窗范围了,那现在我们是这个几行嘛,之前的几行数据,那我们干脆就是Rose啊,Rose后边between。三然后preding preding,哎,然后and到达的下界当然就是current。Current肉。这就是我们对于这一个窗口的完整定义啊,那当然了,后边我们统计出来的这个avgts啊,可以给它做一个重命名,比方说就叫做AV gts。当然这里边还有另外一个问题,就是我们现在的这个TS啊,其实已经不是长整型了啊,因为前面我们指定这个时间属性字段的时候呢,直接就是基于已有的这个time stamp.ro态,相当于把它的类型已经改成了time stamp类型,所以这个时候如果说我们想要计算它的这个平均数的话,可能就有点问题啊,所以如果说是这种方式的话,那干脆我们还是改一下吧。
11:12
使用上面这种方式来定义even table,哎,那这里边我们的TS就不再是时间属性字段,而是后边的ET是一个时间属性字段啊,那所以上面如果这个测试累计窗口的时候啊,我们就是稍微的改一下,把对应的时间属性字段要改成ET,那现在呢,那同样我们是average ATS去进行长整形数字的一个平均时间戳的统计,而后边order by呢,这里要order by ET,哎,这样的话就没有问题了。啊,当然了,这个还没有完啊,我们select这些字段,然后后面做了一个开窗的聚合统计,后边还得有。From,那from哪张表呢?当前我们已经注册好的这个even的table。把它写出来,这样就可以了,那当然最终得到的这个结果我们还是可以做一个转换成流的打印输出,哎,这里又有另外一个问题,就是到底是to changelo stream还是to data stream呢?这里边关键就在于我们进行持续查询的时候,到底有没有更新操作,哎,那我们想啊,这里边针对每一行数据都要开一个窗口,然后进行统计计算,那所以当然就是每来一个数据就后面追加一个吧。哎,所以最终得到这个结果表里边跟之前输入的数据的那个动态表里面的数据其实可以认为是一一对应的,每一条数据都会追加,那当然就只有追加查询,没有更新查询,所以这里我们可以直接To Get three,把over result table传进去,这里我们可以print一下,这个我们叫over的一个结果啊,啊,那当然了,如果想看清楚一点,我们可以把上面这个直接注掉啊,直接就看这个over测试的结果。
12:55
我们可以看一下是不是符合预期。
13:00
好,现在已经输出了对应的结果,我们可以看到,诶,这里的数据现在其实比较少啊,爱ice丝有三条数据,Bob和凯瑞各有两条数据,我们看到爱丽丝第一条数据来了之后呢,诶,那当然了,这个时间戳是1000嘛,那平均数也是1000,这个没有任何问题,鲍B第一条来了之后也是1000,没有问题,第二条爱丽丝的数据来了之后,当前它的时间戳是一五,哎,后边五个零,也就是这是第25分钟时候来的数据,那这个时候计算平均数呢?哎,当然只有两条数据,那就是把之前的1000和当前的25分钟这个时间戳两个一加,然后除以二计算出来之后是,哦,是750500,这样一个时间大概是12分钟多的时候一个时间。然后接下来如果爱丽丝又来了第三条数据的话,哎,那这个是55分钟时候的一条数据啊,那接下来我们看计算平均数的时候,就是把之前的三条数据全部叠加起来,除以三,哎,得到的后边可能除不尽啊,所以有一个333这样的一个尾数,这就是我们计算平均数的一个过程啊,当然了,如果说我们现在数据更多一点的话啊,当前我们指定的开窗范围是。
14:12
之前的三条数据and current row,哎,那是包含自己当前这条数据的话,那应该一共就有四条数据啊,所以往后去推移的话,我们就发现啊,可以去计算之前一共四条数据的平均数,那如果再来新的数据呢?哎,那计算的就当前这个窗口,就相当于往后推移了嘛,就从第二条数据开始,然后往后的四条数据,第三条数据开始,往后的四条数据。这就是开窗聚合的一个测试啊,那当然了,关于实际使用的过程当中,我会发现这个开窗聚合看起来有点麻烦,因为你一旦要用这个窗口,你就得over后边加这么长一串这个窗口的定义啊,那我们这里啊,假如说窗口都是同样的窗口。我要基于这样的窗口呢,计算一个TS的avg啊,做一个这个平均数,另外呢,我还要针对这个URL做一个count,做一个count统计,那难道后边我就再来逗号后面啊,再跟上一个字段后面就是count。
15:12
URL,然后再取over。啊,然后我再把这一个完整的窗口再去复制一份吗?当然可以这么去做啊,但是我们知道没必要,可以怎么样呢?可以在后边单独利用window关键字来直接做一个声明啊,那那么在前面就可以单独的去进行别名的使用了。啊,我们可以直接看一下文档里边对于这种方式的一个应用啊,哎,那这种方式就是在我们前面这个select from语句的后边最后追加一个window这样的一个声明啊,Window w。这个W叫什么呢?As,这后边就是我们当前开窗函数定义的这个窗口的规则,然后接下来呢,在前面我们over的时候就直接over w就可以了,如果要使用了多个不同的聚合函数,针对不同的字段去进行开窗聚合的话,哎,我们直接使用W,这样的话就会方便很多。
16:07
这就是关于开窗函数的使用。
我来说两句