00:00
前面我们已经介绍了在实际应用过程当中一个非常经典的案例,那就是top n这样的一个聚合统计,呃,具体的实现过程呢,我们是使用普通的分组聚合,先把想要排序的字段聚合出来,一个count值先聚合出来,然后接下来呢,使用一个over窗口统计一个roll number,然后接下来做一个筛选,啊,那所以这种方式呢,我们把它叫做普通套盆,那主要就是对比我们在流处理里边比较经典的窗口操作而言的,除了普通套盆之外,另外还有一种套盆的操作,那就是窗口套。其实整体来讲啊,有了普通top n的基础窗口,Top n就非常容易理解了,就相当于是我们之前先进行统计的时候呢,是直接针对所有的数据做了一个分组统计嘛,呃,聚合出了一个抗值,那现在呢,我们就要针对每一个窗口去做一个统计,统计出对应的抗值,然后接下来的排序,我们提取套本也是只针对当前窗口内有效,哎,所以就相当于是把这个窗口聚合加进来了,现在应该是一个窗口聚合和over开窗聚合的一个结合。
01:15
啊,所以接下来我们可以直接在之前的这个代码的基础上啊,再去实现一个窗口套喷聚合的一个案例啊,所以我们为了看得更清楚一点,还是去重新创建一个SC的object,现在我们叫做window top n啊,我们叫top n window example。还是没方法,先写在这儿,哎,那整个的处理流程我们可以先把这边照搬过来,整体来说其实都是差不多的。先copy过来,然后接下来呢?啊,这里涉及到一个,如果说我们想要开窗的话,诶,那统计的这个数据可能时间戳就得有所要求了啊,因为如果说这个时间戳离得很近的话,都在一个窗口内,我们最后的输出可能就看不到一些效果啊,所以呢,这里的测试数据啊,我们就不要用DDL直接去读取这个外部的文本文件了,我们还是用之前这个窗口测试这里的这个数据吗?诶,当时我们不是使用了一些比较好的这些测试数据吗?我们就直接读取一个data stream读取数据流,然后接下来把它转换成一张表,而且既然是涉及到了窗口,比方说我们统计每个小时作为一个窗口,统计这个滚动之间窗口内访问量最大的两个用户,也就是说最活跃的前两个用户,哎,那如果是这样一个需求的话,我们就按照这个时间啊,先把这个测试数据都定义好,这里就是前半个小时哎,到第一个小时到第二个小时的所有的数据都有。
02:44
接下来我们把这个先copy过来。它就可以取代我们前面创建表的这个过程。好,把这个引入啊,那当然了,如果是这种处理流的方式的话,前面我们要把这个下划线还是引进来啊,啊,前面我们先创建出一个stream,一个data stream,注意还得去提取时间戳,生成water mark,这样的话,在后面我们就转换成表的时候,直接指定当前的时间属性字段就可以了。
03:10
然后接下来的查询转换,我们的这个需求呢,也是套N,只不过呢,现在加入了窗口的操作,是一个窗口套,我们是选取每小时内活跃度最大的前两个用户啊,所以首先接下来呢,我们要做一个窗口的聚合统计。就不是简单的一个分组统计了,我们就是要统计每一个一小时的滚动窗口内所有用户,每一个用户他访问量的这个抗值。所以我们发现其实这个也非常简单啊,我们要的还是UID以及count URL,我们把它重命名为CT,接下来from,哎,那就不是直接从even table里边去提取数据了,而是要去开窗,利用窗口表示函数,把它做一个扩展啊,那这种方式的话,我们还是啊,就完整的把这个重新做一个书写吧。
04:06
我们可以直接用这种换行的方式啊,看得更加的清晰一点,接下来我们可以直接select。现在我们要的是UID,这是当前用户的基本信息啊,另外呢,Count URL as CT这个都是需要的,除此之外我们应该会想到啊,当前我们是要做这个窗口的聚合统计,然后经过这个窗口表值函数扩展之后呢,表里面字段其实还有窗口的信息,那窗口的信息我们后边做这个套提取的时候重不重要呢?是不是也要传进去呢?当然是重要的,因为后边我们提取套盆并不是针对所有的这个数据啊,按照这个count值全部排序提取前两名,而是还要根据窗口去做一个划分,只提取当前窗口内的最大的前两名啊,所以接下来我们窗口的信息也非常的重要,那我们干脆就把这个Windows start和window and都拿出来吧。
05:04
Windows start。Window and,这就是我们想要提取的字段,然后接下来那就是from。后边我们跟着的应该是一个啊,窗口表值函数这样的一个扩展啊,所以是table。括号里边当前我们调用的就是滚动窗口,那就是一个函数弹。里边需要传入三个参数。首先是。需要去扩展的这个原始的表,我们现在扩展的这个表就是even table嘛。当然,如果说想要去使用这个e table的话,呃,现在我们并没有在这个环境里面注册啊,所以还是得先去注册一下create temp review。把这个event table的对象实例传进来,接下来我们就可以在C当中直接使用了,啊把它传进来,这是第一个参数,第二个参数呢,那是一个时间属性字段。Script,然后里边传入的,注意时间属性字段,现在我们提取的是ET啊,就是TS还是一个长整型的ET才是时间属性字段。
06:08
接下来还应该有一个当前窗口的长度,那是一个时间间隔了,In tval。直接给一个一小时one hour,这就是我们对于当前窗口的定义,然后啊,当前这个窗口已经扩展出来了,那后边还需要有一个group做一个分组。我们现在分组的字段啊,那应该要有UID,另外呢,还必须要有窗口的相关信息,哎,那所以就是Windows start和window and跟在后边就可以了,这就是我们先做了一个窗口的聚合操作,哎,所以这个如果要区分一下的话,我们可以把它叫做URL count。Window table,然后接下来。可以把它也注册一下URL window。这个叫做URL count window table,同样后边如果我们想要去选取的时候呢,当然就是from这张表去做提取了,后边做这个TOP2提取的时候,规则还是一样,我们想要提取出当前,其实就是当前所有的信息嘛,我们把当前所有基本的字段,以及聚合扩展出来的这个row number,所有的都要提取出来,所以这里我们干脆直接啊。
07:20
Select芯就完了,后边from啊,From这里是一个子查询,这个子查询呢,那就是要从我们前面聚合得到的这张表里边还要再扩展出一个number,这里的number当然还是基于CNT这个count值,要做一个order by做一个降序的排列,然后我们提取前两名,这个都没有问题,关键在于我们这张表里的所有数据呢,我们想想它长得应该什么样呢?诶,当前它所有的数据应该是UID啊,当前一个用户。然后它的一个count值,比方说爱丽丝有五次访问,后边还有窗口的信息,在哪个窗口内它是有的,那比方说我们这个,呃,零到一小时,第一个小时我们这个单位是小时了啊啊,然后后面可能第一个小时内还有B。
08:08
它统计出来是三次,也是零到一这个窗口内,那后面呢,呃,可能在第二个窗口内,A访问了两次一到二。所以最后啊,我们这个所有的count值不能全放在一起去做排序,这就相当于把不同窗口内的数据合在一起了嘛,这个没有意义,我们要根据不同的窗口去进行一个划分,进行一个分组,得到的才是每个窗口内的套盆啊,所以接下来我们就应该在前面追加一个。那要分组当然就是BY,我们现在要去分组的字段,那当然就是window的信息了,Window start和window and,那起始和结束时间当然就完整的定义了这个窗口,这就绝对唯一。所以这个过程我们看到啊,就是套用了普通套宽的这个模式,这个模板还是一样的啊,所以我们看里边的这个子查询,就是基于之前我们已经得到的这个URL count window table啊,在里边,然后又做了一个开窗函数的聚合,针对每一个数据,哎,我们按照窗口分组,然后呢,根据它的count值做一个排列降序,排列统计出它对应的那个行号,这个名次就有了,接下来呢,选取每个窗口里边的前两名提取出来就可以了。
09:30
所以这个过程啊,有了前面做基础,其实实现是非常非常容易的。哎,那同样后边我们得到的这个结果表呢,接下来就可以转换成流打印输出,这里还有另外一个问题就是诶,我们得到的这个结果表是tolock stream还是to data stream就可以了呢。啊,这里就涉及到到底我们最后啊做的这一个TOP2的聚合统计,有没有涉及到更新操作,哎,那其实跟之前我们考虑这个窗口聚合的时候一样啊,因为窗口聚合它针对每一个窗口的结果,比方说当前这个零到一啊,第一小时内啊,窗口的起始点和结束点是这样的,那针对这一个窗口呢,统计出来的结果,比方说A5B3。
10:18
C6。针对这样的结果,很显然他们就只会输出一次,所以URL count window table这个表里边的数据啊,都是追加进去,所以它是一个追加查询,然后接下来我们基于这一个得到的这个追加查询的表啊,再去做一个套牌聚合的时候,诶,那得到前两名是不是也是不再更改的呢?诶当然是不再更改的,因为同一个窗口的数据它都是一次性输出的了,一次性输出,当然它的套牌也就是固定的一次性提取出来。输出到我们最终的结果表里就行,所以它其实是追加上去啊,那这里我们就不需要to turn log stream,直接to data stream就可以了。
11:01
所以接下来我们可以直接运行一下,看看它的效果是什么样。现在我们的数据也比较少啊,只有上面的这几条数据,我们看一下哦,果然得到的就都是加I,都是追加,首先我们看到啊,诶从零到一一小时这前两条数据,这就是第一小时内的TOP2,我们看到排名第一的那是Alice,访问了三次,排名第二的是Bob啊,那访问了一次,所以我们看在第一小时内统计出来的数据,它就只count输出一次,然后TOP2当然也就只输出一次,没有更新。那同样第二小时内容统计出来,凯瑞有两次访问排名第一,Bob有一次访问排名第二,那跟前面第一小时的数据就会有所不同了,他也不会去覆盖更新之前的结果,而是在后边直接追加输出就可以了。这就是关于窗口套的具体实现。
我来说两句