00:00
我们已经了解了flink CQ当中各种各样的聚合查询方式啊,那有最为经典的CQ当中的聚合方式,分组聚合,也有link流处理当中的特色窗口聚合,那另外呢,还有比较特殊的开窗聚合,那有了这些不同的聚合方式,我们在实际应用的过程当中,当然就可以把它们结合起来,去处理各种各样不同的需求了。那这里呢,我们要介绍一个非常经典的稍微复杂一点的应用场景,那就是传说中的top n啊,其实之前我们也已经提到过啊,所谓的top n,字面意思就是最大的N个,也就是说我们要统计当前啊所有数据啊,进行一个聚合统计,统计出来的结果呢,可能它有一个数值,然后我们按照大小做一个排序,提取里边前几名。之前我们在讲到处理函数的时候,其实是可以使用状态编程啊和处理函数啊,底层我们做一个这个窗口统计的排序,最后输出套文,那现在如果说我们想要用上层的APICQ能不能直接实现呢?当然应该也是可以的啊,只不过套盆这个需求很显然比之前我们提到的那些聚合操作就都要复杂一点啊,因为我们想到啊,你不管是some maps me count啊,不管是什么操作,那其实整体来讲都是我们去收集数据嘛啊,比方说啊分组或者说窗口聚合,那开窗口,按照窗口进行数据的收集,收集起来的所有数据,哎,接下来我们就做一个聚合统计,得到对应的唯一的结果,这就是我们所说的啊,窗口聚合和分组聚合都是多对一的聚合关系,多条数据得到一个结果。
01:44
那开窗聚合稍微有点不一样,开窗聚合呢,是针对我们原始的动态表里面的每一条数据都做一个窗口的扩展,来扩出很多条数据来。然后接下来呢,再把这些数据做一个聚合,得到唯一的一个聚合结果,所以我们看到它有点像是一对多再对一啊,那这个最终结果看起来的话,好像变成了一个一对一的状态。
02:11
所以这两种形式呢,其实从聚合的角度来讲,都比较简单,本质上都是把很多条数据合到了一起,利用某种计算规则整合到了一起。哎,那所以现在如果说我们想要去做这个top n的话,那就会发现啊,这里首先我们应该针对原始的数据很多条,诶,那可能需要针对它做一个聚合,合成一个数值,得到一个值,得到一个值之后呢?哎,那可能我们这里要得到很多个不同的值,接下来这多个值又要做一个排序,提取前面的N个。哎,所以在这种场景下啊,最终我们输出的就不是只有一个聚合结果了,应该是N个,而且在得到这个top n得到最多的这N个之前,可能那还是很多很多个,我们最终是做了一个提取。
03:06
所以在这种场景下,这是一个标准的多对多的转换啊,那这种转换方式的话,前面我们提到的那些聚合方式都没有办法直接实现。它相当于是要把我们这里的一张表,然后呢,诶提取很多条数据,然后直接聚合成很多条结果,它并不是像这个over窗口啊,是一条扩展出多条,然后聚合出一条结果,我们现在直接就是多对多,直接要做这样的转换啊,这个之前我们介绍的所有的聚合方法都没办法单独实现,哎,那这个需要用什么样的方式去实现呢?啊,那就是多对多的这种转换,在CQ里面把它叫做表聚合函数,这种函数可以去实现。目前呢,呃,就是只有窗口TPF,我们所说的这个窗口表值函数,它有能力提供直接的套根聚合,也就是说呃,获取啊某种聚合方法,它直接就叫做套N这样的一个函数,直接一调,然后我们呃定义对应的这个窗口,定义对应的范围,然后接下来呢,直接就得到它的套餐结果了,我们理想的状态下肯定就是这样。
04:14
但是呢啊,这个确实是太过于复杂啊,所以目前窗口TPF也还没有实现套盘的这个需求。所以目前呢,在CQ当中不能直接调用这样的聚合函数,那可以怎么样呢?有一些稍微复杂一些的变通方法,就是结合我们之前用过的其他的一些聚合方式,放在一起可以实现一个套的需求啊,那这个呢又分成两类,首先我们先来看一下普通的套N。其实整体来说这个思路也比较简单,哎,它就是之前我们讲到over聚合,开窗聚合的时候,不是提到它里边有一个order by吗?诶,那我们自然就想到了order by,它自然就是要把这个数据做一个排序嘛,哎,之前我们说目前flink支持的order,它的排序字段只能是时间属性字段,诶那我们就想,如果说啊,把这个做一个扩展,如果要是做一些优化的话,我们可以按照我们想要排序的那个字段,哎,比方说啊,我们统计出来的count值,根据count值做一个从大到小的排序,是不是就可以实现我们能够拿到的这个套喷了呢?哎,所以这就是一个基本的思路啊,所以link对于套喷的需求是单独做了优化的,就是over聚合,在这种场景下可以指定一个排序字段。
05:31
按照我们要求的这个规则做一个排序,而且呢还可以指定到底是升序还是降序,As sc啊,三顶是升序DECSC的话啊,第三点当然就是降序了,所以一般如果我们要是选取top n,如果是DN大,前N大的话,那我们就应该是降序排列啊,当然了,Top n说是前N大啊,那我们也可以提取前小那最小的几个,那就是升序排。那这里呢,只是做了一个排序,排序之后接下来我们怎么样去做这个提取呢。
06:07
诶,所以这里边是排序之后的这个窗口啊,它是针对每一条数据去进行扩展统计计算的,诶那我们现在呢,就是要得到当前这条数据,在所有数据里边的一个row number,也就是排序之后啊,它到底排第几,所以我们看这其实roll number就是一个比较特殊的聚合函数,把它应用到当前的这个over窗口上,那么就可以得到每一个数据啊,按照我们这个排序字段的一个排名情况。然后这个呢,还得做一个提取啊,所以我们把它做一个属命名,接下来。把得到的聚合结果作为我们当前的子查询的结果表啊,然后接下来从这里边去提取row number小于等于N的所有的数据,这就是我们所说的top n的提取。
07:00
所以整体来看的话,我们这里边关键点就在于使用了一个基于开窗函数的row number这样一个窗口的聚合啊,那这个过程当中呢,我们需要跟之前讲到的内容做一个区分,就是之前我们说到order by后边只能跟时间属性,而且只能升序排列,只能按照时间去做一个升序排列,现在呢,那就可升序可降序,而且还可以单独指定排序字段了。注意,我们只有用这种方式去实现弗link才能正确的解析,因为弗link底层是针对topn的这个需求,单独对over窗口做了一个优化,诶,那所以只有我们写成这种形式的时候,完整的写成这种形式的时候,弗link才知道我们是要做一个套喷的提取,哎,所以在这种情况下是正确的,别的情况如果直接order by一个非时间属性字段的话,那是会报错的。所以接下来我们还是可以在代码当中来做一个具体的测试。现在我们要测试的是top n啊,那我们就直接创建一个SC的object,就叫做top n example。
08:09
那方法先写出来,首先当然还是标准的流程啊,我们需要先创建当前的表环境,然后哎,创建一张表读取数据源,比方说我们就把这个copy过来吧。直接copy啊,那当然了,这个我们只要创建出tablev就可以了啊,后面这种方式我们就不用再去重复的去定义了。首先先创建一个输入表,这里是从当前的文本文件click.txt里边读取数据,读出来之后呢,哎,我们定义了UIDURL,还有一个TS。那这里需要注意就是后边啊,如果说我们想要针对时间去做一些窗口操作的话,哎,那这个就还没完啊,那我们还应该把对应的。时间属性字段要在这个DDL里面要定义出来,哎,那这个我们就直接copy这里的啊,一个ets to time啊做一个转换,另外还有一个watermark具体的生成策略。
09:05
好,我们把这个copy过来,哎,注意这里ts in后边也需要有一个逗号,我们已经创建好了这张表event table从文件里边读取数据了,哎,那接下来呢,就可以直接基于这张表做一个top n的查询转换了啊,那这个top n需求呢,我们想一个具体的场景吧,比方说啊,当前我们都是电商网站上用户的访问事件,那所以呢,我们就可以针对每一个用户,对于当前网站的这个访问量,它的浏览次数我们做一个统计,然后做一个排序,选取访问量最大的前两名用户啊,这是一个TOP2的需求啊,像之前的话,我们知道可以选取这个,呃,最大访问量最活跃的那个用户啊,那相当于就是直接一个max嘛,啊相当于聚合出来之后选最大的那个就可以了,那现在呢,不一样,我们要选多个,哎,它这本质上就变成一个多对多了啊,所以我们选取活跃度。
10:05
最大的前两个用户啊,这里所谓的活跃度其实就是他访问URL的次数,所以本质上我们还是要做一个抗统计,那之前其实我们也做过啊,类似这样的一个抗统计呢,那是相当于要先做一个分组聚合的啊,那非常简单,就是select当前的u ID user先拿出来,然后呢,Count URL啊,啊后面就是from这个even table,然后接下来我们应该是要group by u ID,直接做这样一个分组聚合,所以首先第一步啊。我们进行。分组聚合统计。计算每个用户目前的访问量,我们是把这个访问量作为活跃度的一个表示的,其实这个我们也已经不用单独去实现了啊,之前我们已经做过这样的一个简单聚合啊,分组聚合,统计每个用户的访问频次,那得到的就是一个URL count table啊,那这里边执行的这个CQ select u ID count URL from table group by u ID,我们直接把这个copy过来。
11:13
哎,那当然了,把这个得到之后,接下来如果我们想要使用这个URL count table的话,还要做一个创建虚拟表的过程啊,相当于就是在表环境里边要做一个注册,在这里得到的是URL com table。当然对应的这个名字我们也不需要叫做temp啊,也可以把它叫做URL accountable。然后接下来那其实就是针对这个得到的URL count table这个里面的字段,我看就是一个用户,然后还有一个count数值嘛,而且这个表里面的数据我们知道它是会不停的变化的,它是一个更新查询,然后接下来基于这一张动态表里的数据,我们要去提取里边的前两名。所以第二步是要提取。
12:00
Count值。最大的前两个用户。啊,那这个过程呢,因为我们用到了count值,那之前这个抗URL,我们最好是还要做一个重命名SCT吧,啊所以接下来我们要做的这个操作呢,写这个CQ CQ query。我们可以啊,这个可能稍微复杂一点啊,一行写不下,我们就还是换行做一个书写吧,Select最终我们要的当然就是UID,另外还有CT当前的count值,诶,注意另外还应该有一个排名的名次,哎,所以这个名次呢,其实就是我们的那个road number吗?还记得当前排序的那个行号吗?诶,所以我们就把这个叫做row_nu number提取这三个字段。然后接下来的流程,诶,当然就是标准的top n的这个过程啊,我们看到啊,Top n里边from里边是一个子查询,这个子查询里边定义一个窗口,聚合一个over开窗函数,然后呢,得到这个容number,做了聚合之后的这个结果,然后外边再使用这个number小于等于N作为一个筛选条件啊这样的话就是完整的一个处理流程,所以我们先把这个框架先搭起来,外边是这样的一个from。
13:14
然后呢,后边还需要有一个where row number小于等于二,我们现在是TOP2,当然就是小于等于二了啊,那如果我们想要看的清楚一点的话,也还是啊,可以把关键字都做一个大写啊,Select,然后后边from下面的where。全部都大写,然后接下来里边这里就是一个子查询了,所以还是。Select。我们知道这个子查询里边要做的事情,最重要的其实就是聚合出一个number这样的一个字段啊,呃,那关键就是定义这样一个over开窗函数啊,所以在这里我们其实前面所有的字段都是需要的,完整的都得提出来啊,那干脆啊,我们就直接select芯。然后后边接下来就是一个number。
14:02
这个函数本身就叫做row number_number。哎,然后接下来当然是over,关键字跟着一个开窗函数。那对于这里我们的开窗函数呢,偶尔窗口呢,其实我们是要把所有的数据啊,就之前我们得到的所有的用户的对应的抗数据都要放在一起排序,哎,所以这个我们根本没有必要去定义分区嘛,没有potential by。然后接下来当然就是order by了,Order by,我们现在当然是BYCT了,基于这个count字段做一个排序,注意还是降序排列,哎,那所以后面我们跟一个DEC降序排列,后面还应该有一个当前选取的范围,那范围也不需要有了,我们就是所有的数据嘛,所有数据都要,那当然不需要定义范围啊,全拿过来就可以,所以这就是我们非常简单的一个开窗函数的定义,所有数据按照CT降序排列,聚合出当前的number啊,那当然了,我们这里真正提取的时候是把它叫做这个字段啊,后面还需要有一个as重命名。
15:07
得到它,这是select,然后呢,那当然还得from了,From哪张表去提取对应的这些信息呢?就是前面我们已经得到的URL comfortable啊,所以我们看啊,就是前边先做一个普通的分组聚合,得到对应的这个访问量,就是我们后面要排序的这个字段先得到,然后后边呢,就非常简单的利用一个开窗函数over做一个排列。聚合出一个当前排序的排名,接下来哎,利用一个where条件筛选得到TOP2就可以了,这就是完整的一个处理过程。好,那么最终得到的这个结果,我们可以把它叫做,因为我们现在是top n嘛,我们就直接叫做TOP2RESULT。Table这是一张表,那如果说我们想看到里边的结果的话,应该把它转换成U打印输出,所以还是调用table en v two,哎,注意这里边到底是to date stream还是to long stream呢?那要看我们整个转换处理的过程当中有没有更新查询好,那所以这里边我们看到前边我们在做这样一个转换的时候啊,URL comt table本身这里边它就是有一个更新查询的,那关键是在于我们最后得到这个top result里边有没有更新呢?
16:27
哦,那就是之前这个URL countt c啊,它里边可能就是比方说一个用户a Alice,他的count值有五次,然后一个b Bob。它的抗值有三次啊,可能还有carry Mary,它的抗值多1.6次,那如果针对这样一张表,一个数据啊,首先如果只有A和B的数据的话,那我们知道后边得到的这个TOP2结果里边肯定就是A5,它排第一。B3它排第二就是这样一个结果,那如果它的输入又多来了一个C6这样的一个count值呢?现在我们结果表显然就要发生变化了,而且不是往后追加,不是C排第三,C排到第一了,所以接下来我们的结果表里边应该是C6。
17:15
排第一,然后呢,A5之前是第一,现在降了排第二了,而B3这条数据呢,直接踢出去了。所以我们发现啊,最后的TOP2结果表里边是有更新查询的,所以我们必须要做这个to change stream这样一个转换。那TOP2RESULT table放在这里,然后print打印啊,那当然了,我们这里还需要执行起来,Env,我们可以运行一下,看一看结果到底是什么样子。好,现在已经输出了对应的结果,因为我们现在这个是读取的clicks这个文本文件啊,数据相对来讲是比较少的,我们就大概的分析一下吧,首先第一条数据,Mary的数据来了,哎,那这个没问题啊,他的count值是一,然后啊,对应的这个名次是一,没毛病,然后接下来呢,鲍B的数据来了,Bob数据来了,我们看又是加I啊,又是插入一条数据啊,之前没有他嘛,他现在抗的值也是一,现在他的排名是第二啊,那所以Mary的名次不变,Bob排名是第二,追加进来,现在我们最终TOP2的这个结果表里边就有两条数据了啊,这就是我们说的啊,这张表里边首先MARY11先插入进来,然后Bob。
18:31
一二又插入进来。然后其实我们能想到啊,TOP2的结果表里边始终最多只维持这两行数据,接下来啊,它一定是会有更新操作的啊,所以我们看接下来如果爱丽丝又来了一条数据呢?啊,那爱丽丝我们知道按照这个啊,相同数量的话,爱丽丝排第三,排第三就不会插入进来了,诶,什么都不做,然后又来了一条Mary的数据。啊,又来了一条Mar的数据,那我们看到啊,当前这个Mar数据,它本身就已经count值变成了二,哎,所以尽管当前的名次没有发生变化,但是这个count值变了呀,所以我们看到接下来会有一个更新操作。
19:12
减U,把之前的MARY11这条数据删掉,然后更新成MARY21这条数据。啊,那同样道理,如果是Bob又来了一条数据的话,那它也是名次不变,但是呢,呃,当前它的数量也要改成二,所以是减u bob12加UBB22。那这个如果看的还不是很明显的话,后边我们还可以再追加一条数据,数据再多一点。又来了一条报的数据,比方说这个是六秒钟来的,那现在如果我们再来运行一下的话,会是什么效果呢?啊,其实会想到啊,最后一条报B,数据来了之后呢,那就会引发整个排名的变化,我们看到前面截止到这里,诶还跟之前都是完全一样,最后一条数据报B,这条数据来了之后,这里发生了很大的变化,一条数据的输入就代表我们后边最终的套牌结果表里边的四条数据输出更新日志,四条更新日志的输出,我们看到到底是做了什么操作呢?哎,那首先我们想到啊,在他来到来之前,里边两条数据,那就是Mary。
20:20
Count值是二,排名是一,Bob。Count值是二,排名是二。那么Bob这条数据来了之后呢,我们知道现在Bob count变三了,它变三了之后,这个排名就要往上升了。所以我们看到啊,首先我删掉了这里的第一条数据,MARY21要删掉了,他不排第一了。取而代之的第一是BOB31。然后同样呢,哎,那之前的BOB22这条数据当然也就不要了,Bob排第一,那把Mary是挤到第二了,所以接下来第二条数据要更换成我们看是加优MARY22。
21:03
Mar count值不变,但是排名降低了一倍,所以我们其实看到就是整个这个排行榜里边,只要发生一点变化,我们当前都是一个更新操作,哎,所以都会输出对应的更新日志。这就是普通的套,我们是使用了一个简单的分组聚合以及一个开窗聚合实现了这个功能。
我来说两句