00:00
前面我们介绍的分组聚合,这是C口当中非常经典的聚合操作,那接下来呢,我们要介绍的就是flink流处理当中的聚合特色了,那就是所谓的窗口聚合啊,我们知道本身流处理里边要面对的是无穷无尽的数据流啊,那所以对于这种无界流,如果说想要去进行统计计算的话,最好就是把它划分成有界的数据集,这就是我们所说的啊,窗口去进行一个分统操作,这就是窗口聚合的一个基本思路。之前我们在上一节当中呢,介绍了窗口的基本定义方式,这只相当于是窗口的分配器啊,那接下来我们要介绍的就是到底怎么样针对窗口收集到的所有数据去进行一个统计计算啊,这就是我们所说的类似于窗口函数的一个功能啊。那在C口当中呢,窗口的计算就是通过窗口聚合这样一种方式来进行实现,其实这种聚合方式呢,跟前面我们提到的这个分组聚合非常的类似,它也是按照窗口把数据进行分组的这样一个思路啊,就跟传统一开始啊,我们所说的那种分组窗口的思路是非常类似的,只不过现在呢,我们使用的是窗口表值函数,窗口表值函数已经针对当前原始的数据扩展出了新的列,哎,我们说扩展出了当前窗口的window start window and,还有window type3个对应的属性字段,所以接下来我们针对这张表里面的数据呢,就完全可以按照窗口的start和end作为一个分组的K,然后结合,比方说啊,我们按照user去做一个分组,哎,那把它们作为一个组合,K1分组,然后接下来去调用对应的那些聚合函数。
01:48
不就完事了吗?哎,所以我们发现啊,C口当中直接给我们提供的那些聚合函数到窗口聚合这里完全好使,只要我们使用窗口表值函数把这个表扩展出对应的窗口字段就可以了啊,所以本质上来讲,我们可以认为窗口聚合呢,还是一种分组聚合,只不过在分组的过程当中需要使用扩展出的窗口字段也作为分组的K。
02:14
所以接下来我们可以看一下这个在CQ当中啊,代码当中到底怎么去写这个形式的话,那就是select,提取我们想要的字段,比如说我们想要user,或者前面我们还提到了啊,想要选取当前窗口的结束时间window end作为一个信息啊,那像之前我们那个老版本的分组窗口的使用呢,这里要调一个函数,现在也不用了,因为我们直接扩展出来有字段嘛,Window and选取就可以了。另外呢,如果说针对数据想要做一个聚合统计的话,直接调用。对应的聚合函数,比方说count URL来把每一个用户在当前窗口范围内的所有访问的URL做一个频次的统计,所以我们知道这个统计出来不就是之前我们所说的那个URL view count吗?好像就是每个用户所访问的这个URL的个数啊,然后from table,注意接下来的这个table呢,这就是一个窗口表函数啊,一个表函数的写法,我们现在要获取到的是一个新的表,这个表是基于原先的even的table做了一个窗口的扩展得到的一个新的表啊,所以我们看到啊,直接这个table里边传入的就是这个窗口表值函数这个T啊,里边传入的参数当然就是对应的这个窗口的定义了,滚动窗口的定义啊,一个小时以TS作为时间属性字段,然后基于even table做了一个扩展。
03:41
然后后边的关键在于要写一个group by group by,当前我们是按照用户去做分组,另外还要加上窗口的信息,这里窗口的信息就是Windows start和window and啊,因为这里需要注意的是这个Windows start window and都不能少,为什么呢?因为现在我们不光有滚动和滑动窗口。
04:04
我们现在还有累积窗口,哎,如果是累积窗口的话,这个就有可能会出现它的Windows start是完全一样的,哎,它就是这个只有window and不停的在朝后推进,哎,所以在这个累积窗口的情况下,你如果只给一个window star,那肯定就不对了,所以我们也不排除啊,有些特殊情况,有可能这个window and也会出现相同的情况,哎,那所以这里我们就要把Windows start和window and都要放在这里作为分组K。所以现在有了这一部分知识,我们就可以直接在代码里边对于窗口的操作来做一个结合的测试了,啊,前面我们已经定义了基本的时间属性字段,那接下来我们就看看窗口到底应该怎么用了。所以我们要。测试窗口,哎,那别的像这个滚动滑动窗口都已经非常熟悉了,干脆我们现在就测一个新的吧,测试一个累积窗口。那我们要统计的东西呢?啊,其实跟之前想到的还是一样,我们就统计每一个用户在每一个窗口内所有访问URL的频次,啊,就是我们这里的URL count这三个字段啊,啊选取user window and和当前URL的一个count值,把它选取出来,只不过现在我们是累积窗口的话,诶,那我们得定义一个时间范围了。
05:20
比如说我们可以统计,哎,当前我们统计的这个时间周期是一个小时,一个小时统计一次当前所有的这个URL的访问次数,而这个累积步长呢,累计步长简单一点,半个小时累计一次,也就是说呢,我们第一次输出的应该是当前这一小时之内,前半个小时收集到的所有的数据的频次count值,然后第二次呢,A,那就扩展到一个小时了。它是一个累积增加的过程,所以接下来我们就可以在代码里边去使用窗口表值函数,对于even table去做一个扩展,然后针对这个累积窗口去做一个窗口聚合啊,那当然了,如果说我们在CQ里边直接写CQ的话,那之前的这个table对象实例,我们还要在表环境里边做一个注册啊,那所以这个table en先要去注册的时候,其实就是create temporary嘛,直接这样去注册就可以。
06:16
这里就叫做event table。后边需要传入event table的对象实例啊,然后接下来我们就写这句CQ了,那可以直接定义一个我最后的result table,经过查询转换之后得到的就是一个result table啊,这里调用的是一个table en的CQ query方法,好,那那这里如果说我们想要看的更清楚一点的话啊,可以直接用这个三引号,然后做一个换行这样的一个表述,这个看的会更加的清晰啊,在tla里边我们可以直接用这种方式,比如说这里我们直接select。要选取的字段选取什么呢?哎,我们当前需要user啊,当前我们定义的这个已经是重命名成了UID,那我们就把它叫做UID吧。
07:01
UID,然后另外还需要有window and。Window and,本来扩展之后就有这个字段嘛,所以就叫window and啊,那如果说我们还想重命名一下的话,可以叫做as and t,这个也是没问题的,最后还有一个字段,那就是count值了,Count的话,我们直接调用count这个聚合函数,针对URL做一个技术统计啊,那可以SCT重命名一下。然后接下来啊,那当然我们就可以直接from从哪张表里呢?哎,那这个表是使用窗口表值函数扩展出来的这张表啊,这个表的话,我们还是使用这个大写啊,看的清楚一点吧。诶,这样一个table,然后里边需要去调用的是cuumulate这样一个窗口函数,然后里边就是对于一个累积窗口的具体定义了啊,我们知道它里边有这样的几个参数啊,第一个参数是一个table。就是我们当前所定义出来的这个数据啊,Even table数据来源,然后除了even table之外,第二个参数呢,是一个script ts啊,就是当前这个时间属性字段包装在一个script里边。
08:15
Script里边传入TS。接下来啊,下边的参数当然就是累计窗口相关的那些信息了,我们要定义的累计窗口是,诶,整个统计都周期是一小时,然后呢,累积的步长是半个小时,哎,那所以先写注意啊,这里边跟滑动窗口一样,都是先写比较短的那个步长,最后再写长的那个窗口长度或者是周期啊。所以接下来我们应该是因。Interval,首先来一个30分钟的累积步长minute,然后下一个参数是INTERVAL1小时,那就直接1HOUR,哎,这样的话当前这个累积窗口就定义好了,那最后呢啊,聚合操作不要忘记还要有一个group by指定我们当前分组的key,那当前的分组的话,User啊呃,我们现在是把它重命名叫做了UID啊,按照用户要做一个分组,另外还有就是窗口的信息window start和。
09:19
Window and这两个都要作为分组的K写在这里,这就是我们完整的累积窗口进行聚合的CQ啊,那当然了,前面如果说我们希望让这个关键字啊风格一致一点,都把它改成大写也是没有问题的啊,包括这个as,还有这个count函数都可以改成大写。这样的话看起来会更加的清楚一些,包括from,这样的话,这个CQ就写完了,得到了一个结果表result table,那这样一个result table呢,如果说想要看到它里边的结果,我们想要把它在控制台做一个打印输出,那需要转换成U。哎,最简单的方式就是转换成流打印输出,这种转换的话,调用的当然就是table env的to,那我们到底是to data stream还是to change log stream呢?哎,这里我们会想到啊,既然做了这个聚合操作,那很有可能是做了这个结果的更新的,但事实上我们知道,针对一个窗口而言,它里边输出的结果呢?那是每一个窗口统计的结果只会输出一次。
10:29
接下来呢,不会更新之前的结果,而是在后边追加新的窗口的结果,即使我们这里是累积窗口也是一样,因为我们想啊,第一个小时之内,那第一个窗口输出的时候,它其实是前半个小时的统计结果,那它的start是零,哎,那结束and呢,应该是30分钟。而第二次输出的时候呢,输出的是一个小时之内的完整的数据统计结果,那他的start是零,结束点应该是60 60分钟的时候啊,所以这根本就是不同的窗口,当然也就不会有更新操作,而是全部都是追加操作,所以这是一个追加查询,并不是更新查询,后面做打印输出的时候呢,就可以直接to data stream就可以了,把result table。
11:17
传输进来,然后做一个print打印。那最后呢,啊,因。要执行起来,因为我们当前是基于这个data stream啊进行的转换处理,所以en electcu执行起来,接下来我们可以运行一下,看一看得到的测试结果是什么样。好,这里还有一个问题就是呃,我们当前的这个数据呢,可能没有半小时一小时那么多的数据啊,所以啊,这个输出的数据可能就都只集中在前一小时之内了,所以我们看到啊,呃,这里统计了Alice丝第一个输出的window and,我们看是。00:30,也就是说前半个小时的时候啊,输出了一次,那我们看这里统计的爱ice丝的点击就有三次,然后呢啊,后面凯瑞点击有一次,Bob点击有两次,后面我们看到啊,这个数据在一小时统计的时候其实没有发生变化,因为这里边最大的这个时间的数据啊,只是105秒啊,那很显然这个没超过半个小时吧,所以我们可以把这个数据做一个调整测试一下啊,那我们可以直接用文档当中已经设计好的这些数据,那我们可以看到1000乘以1000,这表示的是秒数,那再乘以60呢,这就是分钟数了,哎,所以25分钟的,55分钟的啊,然后后边我们看啊,3600乘以1000,这就表示一小时嘛,一小时再加,哎,这种表示就看的会更加的清楚啊,到底属于哪个时间段的窗口看的一清二楚,所以可以把这个数据做一个更改。
12:48
好,然后接下来啊,那我们直接运行看一看现在的效果是什么样,这个就应该能够体现出累积的效果了,因为我们看到前面啊,这个前几条数据啊,在没有加这个3600乘以1000的之前的这四条数据,应该都是第一个小时之内的数据。
13:07
然后呢,啊,这个25分钟之前的这三条数据,这是前半个小时的数据,所以我们看到啊,首先输出的。前半个小时的数据,爱丽丝有两条访问,我们看到就是爱丽丝第一秒的数据和25分钟的数据,Bob呢,呃,有一条数据,这是第一秒钟访问的数据,这是第一次窗口输出,在30分钟的时候输出的,然后在一小时的时候呢,又会输出第二次,这个时候就新增了一条爱丽丝的数据,所以我们看爱丽丝的访问次数是在之前的基础上又加了一,变成三了,啊,那Bob没变,还是一,这就是累积窗口的特点。然后到第二个小时呢,哎,所有的数据就相当于清除了,重新开始计算,所以我们看到下面有三条数据啊,第二个小时里边Bob有一条,Carry有两条,在统计这个前半个小时的时候呢,只有Bob的这条数据,那carry的两条数据我们看到啊,都是在30分钟以后了,所以我们统计出来就是在两小时这个结束的时候,这个窗口里边carry有两条数据,Bob有一条数据,所以这就是累积窗口进行窗口聚合的一个完整测试。
我来说两句