00:00
了解了flink当中聚合查询的各种方式,那我们接下来就可以灵活的使用不同的查询以及窗口的聚合方式,然后在实际的项目应用当中实现各种不同的需求了。那这里呢,我们接下来就要讲一个项目当中常见的应用实例,这就是所谓的top n。泡喷的话,从字面上来看,那就是。最顶上的N个对应的数据啊,那字面来看的话,这其实就是选取最大的N个嘛,啊,当然了,这只是一个范称,我们不仅可以选取最大的N个也行,也可以选取最小的N个,那我们可以想象一下这个应该怎么样去实现呢?呃,简单来讲的话,它应该要有一个排序,我们可能需要去选取当前这张表里边的某一个字段,呃,或许是已经现成的某一个字段,或许呢,可能还需要单独的先做一个统计,比方说我们之前的这个count当前的数量。
01:06
把这个数量先统计出来C,那现在我们就可以对当前的这一个问的次数C做一个排序,从大到小的排序,排好了之后,诶,那当然我们就可以截取它的前多少个,就得到了当前的这个top n了,比方说我们要选取TOP10,诶,那就把当前从上到下的前十行数据提取出来,这就是我们想要。所以很容易想到,这里就涉及到了一个排序和筛选的过程。那这个过程其实跟之前我们讲到的其他聚合是不同的啊,因为我们想到了,就是之前我们都已经。呃,提到对于fliq而言,聚合其实非常的简单,不管是常规的这个分组聚合,还是基于窗口的聚合,还是基于开窗函数啊,Over窗口的聚合其实都是调用对应的一个聚合函数就可以了。
02:04
这里的count sum mapx,不管是什么样啊,包括取平均值avg,我们都可以直接调用它调用的模式其实就是要针对一组数据。很多行数据它里边的某一个字段这一组值,然后进行一个聚合计算。最终得到的是唯一的一个结果,哎,所以它其实本质上是一个多对一的关系。当然了,区别就在于分组聚合的话,诶,那是。我们相当于按照某一个K。去做了一个划分,当前K所有的数据分到一组,然后把这些统计出一个值来啊,那如果是窗口聚合的话啊,那就是我们有K,另外呢,还有窗口,我们知道一般情况都是当前的K以及窗口的那个特征字段,这样的话就相当于。
03:01
把同一个K和同一个窗口内的所有数据,然后划分成一组,聚合出也是单独的一个值。那另外比较特殊的一个是开窗聚合,之前我们也提到了开窗聚合不同,它有点像是一个多对多的关系,为什么呢?因为它是针对每一行都可以扩展出一个聚合的值啊,这有点像是一对一或者说多对多的关系,但是本质上我们调用这个聚合函数的时候呢,还是把这一行它的上下扩展出了这样的一组数据。然后经过聚合之后得到了一个值啊,所以本质上这个聚合函数的调用还是多对一的关系。而现在的top n呢?完全不一样了,Top n我们得到的就不仅仅是一行数据,你如果是TOP10的话,那不得有十行数据吗?啊,那TOP2也得有两行数据啊啊,当然如果要是TOP1的话,就只有一行数据,那我们想到TOP1,那不就是max吗?啊,或者命这样的函数就可以搞定的吗?所以对于常规的套喷来讲,它是真正意义上的一个多对多的聚合转换。
04:16
就不能直接用一个值来表示我们聚合的结果了。所以这种函数,这就很显然不可能直接用一个掉一个top n这样一个聚合函数就得到了对应的结果,那得到对应结果其实相当于它还是多行数据啊,多行数据这相当于是一个新的表,所以我们怎么样用它,这还是另外的一个问题。啊,所以我们看到这有点像是什么呢?就是调用这样一个聚合函数之后,得到的应该还是一个新的表,跟我们之前讲的那个表函数有一点类似,但是呢,它还不一样,因为它还做了聚合,所以这种函数在fli SQ里边。
05:00
把它叫做表聚合函数啊,就跟表函数有点类似,但是呢,它还是一个聚合函数,跟表函数又不完全相同。所以这样一个表聚合函数要做一个抽象实现的话,其实是比较困难的。我们知道对于分组聚,就是分组聚合或者是传统的老版本的窗口聚合而言,它根本就没有办法直接实现这样的功能,只有窗口T这样涉及到表的这样的操作,才有可能直接提供这样一个聚,只不过现在也还没有实现一点,13版本里边现在还没有提供这样的直接调用的功能。那我们怎么样来实现呢?啊,没关系,因为这样一个top n在实际应用场景里边其实是非常的普遍,也非常重要的,所以flink c其实是给我们提供了一个变通的。可以说是临时的一个使用当前的一些CQ语法实现top n的方式啊,那所以接下来我们就来介绍一下怎么样直接在当前版本的flink当中实现top需求。
06:12
Link CQ当中呢,其实还分了两种不同类型的top n啊,那首先就是一般化的普通的top n啊,什么叫做普通套呢?简单来讲就是没有涉及到窗口操作啊,那我们这里边只涉及到一些简单的聚合啊,就是相当于现成的这个数据已经来了,我们现在就要对当前数据里边的某一列做一个套喷的统计啊,那这样的需求怎么实现呢?在Q当中,它是通过。Over聚合、开窗聚合以及一个条件筛选一个where条件来实现。啊,那具体是怎么做的呢?我们可以看一下这里的基本的语法。具体来说的话,它其实就是使用了一个特殊的聚合函数row number。顾名思义,Number行号。
07:07
所以它的特点是能够把当前诶我们看应用在了一个开窗函数上面over,然后相当于是能把当前的这个开窗函数的这一组数据里边先做一个排序,Order做一个排序,排序之后呢。给每一行追加一个当前的行号。所以这是一个非常特殊的均衡函数,相当于就是排序之后得到当前这个数据,它到底排列的那个位置到底在哪里啊?那么这样的话就相当于可以追加一个字段了。有了这个字段,后边我们再加一个where条件。我们把这个。聚合出来的number啊,给一个重命名,就叫做下划线number nu,然后呢,在外边加一个where,条件要求小于等于N,这样的话,我们就相当于筛选出了最当前这个值最大的或者最小的前N条数据。
08:14
啊,那完整来看这个CQ的话,那其实就是select后边们想提取的字段,那from边一个。然后外面是we这样这个条件,当然了我们还可以and加上其他的一些筛选条件,那这里面用到了一个这个number是哪里来的呢?那是通过内部的这个子查询。做开窗聚合得到,那这个开窗聚合的过程呢,就是row number over,后边这里做一个定义,这里边可以有BY,可以按照某一个K进行分区,相当于就是分组了。后边这个是必须要有的order by,因为我们说做top n最关键的就是要做一个排序,然后做一个提取前N个嘛,哎,所以我们会发现什什么样的聚合里边能够用到排序功能呢?我们现在提到的。
09:14
好像只有over窗口了,只有开装句啊,所以这里边这个字段是不能少的,Order by后边加上的当然就是排序字段了,然后我们这里还有ASD,这表示你到底是按照这个字段升序排列还是降序排列,那我自然就想到,如果要是想要top n统计的是当前最大的NN个数据的话,很显然我们就应该从大到小降去排列。这里就应该是DEC。那所以当前如果。使用这个over窗口,开窗函数针对每一行数据。当前的每一行数据都要统计一个,它在所有数据里边的行号追加在后边,那我们自然就想到了当前这个欧窗口就应该是统计。
10:06
所有数据就不应该再有其他的一些按照时间或者按照行数去做筛选的这样的限定了,所以后边相当于就没有当前的或者是Rose between哪里到哪里这样的一个限制范围,只只有order by,哎,那所以就是相当于选选取的就是到目前为止的之前的所有数据。在所有数据里边做一个排序,然后追加一个行号字段字段。诶,那当前排列之后,从大到小排列之后,最后where做一个提取,直接把前N行拿出来,就是我们想要的top n。这就是这样的一个基本的实现思路,这里边我们需要注意的几点是,呃,这里边有一个order by order by,后边呢,这里直接跟上的就是排序字段,如果我们记得比较清晰的话,一定还记得就是之前我们讲到开窗函数的时候。
11:10
当时曾经说过,这里就只能order by后面能给当前的时间属性字段,而且后面其实根本就不涉及到升序还是降序的问题啊,它只能是按照时间从前到后去做排列,相当于只有时间的升序。那现在为什么又可以有不同的字段,或者说做这个升序降序排列呢?呃,这其实是因为当前flink CQ里边的O窗口还不是特别的完善。它只是针对top n这样一个场景给我们做了对应的优化,诶,所以说当前的这种语法结构它是固定的,我们必须这样去写LIC才能识别出来,我们当前是想要实现一个top n的功能啊,那所以他就认为这样解释可可行的,可以的,如果说我们不按照这个标准去写的话,那后面假如不加对应的这个,就是不调用number去做这样的一个提取的话,那flink CQ是没有办法直接做这样的一个开的聚合。
12:15
这个我们一定要注意。如果说我们想要以其他的方式实现top喷这样的需求的话,目前版本的LIC也是不支持的啊,那呃,当前这相当于就是一个实现top n的固定化的格式,我们必须用这种方式实现。Fliq的优化器才会正常的把它解析出来,才能正常的处理,另外呢,当前的table API也是不支持number这样的函数的聚合函数的啊,所以说目前我们在这个table API和Q里边仅有这样一种通用的套实现,而且不能用table API,只能直接写C。另外还有一个小细节点,就是我们需要把这个where number小于等于N这样的一个筛选条件呢,必须要写在查询的外边,必须在外边再包一层select from啊,这个也很好理解,因为我们知道行号row number,它的生成是在。
13:20
内部子查询做了聚合之后才能得到的,你如果直接跟在这个内部的子查询里边的from后边,然后就where number小于等于N的话,这个显然是不合理的,所以不可能做在内部,做放在内部作为查询的条件,只能放在外边的where。接下来我们可以在代码里边做一个具体的实现啊,当然了,我们可以看到这里有一个简单的案例,那就是比如说我们想要统计,想要做top n排序的这个字段,如果说已经有了的话,哎,那就直接order by在这里用就可以了,如果说没有的话。
14:02
比方说我们现在想要统计的是什么呢?呃,是统计每一个用户,每一个用户都有很多访问点击的事件嘛,点击了不同的URL,我们就统计每一个用户,比方说Alice。他访问的URL有很多,我们要筛选出最长的两个URL,那如果是这样一个比较奇怪的需求的话,那我们可以怎么样呢?尽管没有这个当前访问URL长度这样一个字段,但是我们知道可以简单的调用一个函数,把这个URL做一个转换就可以了啊,就是叉LS直接。提取出来当前每一个URL的字符,字符长度,哎,那然后接下来做一个排序,取前两个可以,这是一个比较简单的场景。那我们现在呢,再来思考一个,嗯,比较有实际意义的一个应用场景,那就是。考虑当前从所有用户的这个浏览数据里边,我们之前不是说可以统计每一个用户他到底访问了多少次,浏览了多少次吗?然后我们先做一个聚合,把当前每个用户访问浏览的次数先统计出来啊,就是之前我们的这个count,先把它count出来。
15:19
然后按照这个再做一个从大到小的排序,提取出当前所有用户当中,总浏览浏览量访问量最高的那两个用户到底是谁?啊,这样一个需求,我们看看当前应该怎么样去实现,这是一个top n的具体的实现,所以接下来我们可以新建一个Java类。我们就把它叫做。Top n example。首先我们还是把基本的代码框架先搭起来,Exception我们先列在这里,然后前面的内容读取数据源,然后创,我们需要创建当前的这个表环境啊,然后读取数据源,这个过程呢,我们就不再重复了,直接copy之前的内容就可以了。
16:11
比如说我们现在还是用这种方式啊,基于当前的,然后去创建一个table,然后下呢,基于它,我们再直接创建一个表,读取文件,得到当前的table这样一个对象,呃,那当然了,我们现在没有table这样一个是直接把它注册在环境里面了,执行这个CQ把它注册,然后接下来我们就来看。怎么样去实现当前这样一个。普通。泡盆需求。那我们简单来讲就是。选取。当前所有用户中。浏览量最大的两个。
17:01
因为我们数据比较少,文件里面数据比较少,所以就只选取两个就可以了啊,那当然了,现在我们就是写一个QQ query。现在我们要选取啊,那当前我们就直接用一个select,我们现在当前这个叫username,其实我们知道这主要是为了回避CQ里边的user关键字,那如果说我们就想把它定义成user,其实也是没问题的啊,那就比如说我们这里要定义的话可以用。这样一种写法加一个反引号,这样的话,我们当前这个user字段就可以在后面使用了啊。那所以如果前面使用了这样一个定义方式的话,后面我们可以直接select user。这样的话,我们可以简单一点。啊,另外我们还可以,诶,注意这里面我们想要的是浏览量最大,那所以统计出来的应该是那个CT它的访问量。最后还应该有一个,它到底排第几,有一个number啊,那所以我们是三个字段。
18:06
这样的三个字,接下来那自然就是from。From后边应该是一个子查询,然后再接下来那就是where。当前的number。小鱼。等于二,这就是我们当前的基本的一个C框架里边的话,这个子查询,那就需要。Select。需要去开窗函数了啊,我们这里边可以直接select芯啊,就把其他的也都拿出来,然后加一个number。调用当前的函数。后面over。开窗函数。呃,我们的over窗口的话,其实是在。获取到一个新的字段,就是当前排序之后,按照C排序之后,到底排第几对应的那个行号啊,所以接下来我们as重新命名一下,就叫做当前的number,这是我们当前的一个基本的定义。那另外还需要from from哪张表呢?呃,需要注意的是我们不能直接从click table里面去选取,因为click table里面并没有。
19:24
我们想要排序的那个字段呀,啊,因为当前如果说我们要首先是里边看有没有那个position by,我们现在其实就没有position by了,所有的都要。或许我们会觉得,诶,这里面应该是by user啊,按照用户去做分组,然后才能统计他的那个访问的次数嘛,但是我们会发现不是在这里要按照用户分组去统计它的访问次数。我们这里面from的时候就应该from的,这张表里边已经有当前每一个user它统计出来的CT了。
20:01
哎,所以我们其实是针对已经做了聚合统计,Count统计之后的那张表来去做这样的一个number的排序。啊,那所以这个本质我们当前其实就是针对所有数据去做的排序,并不进行进一步的分组啊,那当然了,这里边就直接order by就可以了,因为本台position by就是可选嘛,啊那所以当前我们就是另外现在是从大到小降序排列,所以是den。当前的开窗函数就定义好了,那后边这里边的from呢?自然就想到了这里,其实还是一个紫查询。哦,那。我们还是直接ST,那就是user。以及当前需要调用抗弹数。Count URL CT,那当然了,就是。当前的这张表。
21:01
Click。Table。Group在这里。把user作为分组的key传进来,我们想要的是得到的只有user和对应它的访问次数count这样的一张表,在这张表里边去对于CNT做一个排序,选取number,然后呃,对应的我们就得到了TOP2。这是我们基本的一个实现思路啊。那当然了,这一部分我们已经得到了之后,可以把它写成一个我们可以叫top n。Or table?然后接下来。可以直接把它转换成。Data stream,然后进行一个输出,这里面我们需要注意的是当前可以直接to date stream吗?其实是不行的,因为仔细想的话,我们就会发现,当前首先没有涉及到窗口,所以所有的数据一条一条的来,首先在这里。
22:10
我们做这样的一个聚合统计的时候,得到的这张结果表里边的数它是会更改的,但是我们可能想到这一个结果更改没关系啊,我只要当前这个更改了,我把更改之后的这个值传到后边去统计它的行号不就行了吗?但是我们想到只要前面的这一个值更改了。前面的这个值一更改,它就有可能会影响我们后边的。排序的行号啊,这个排第一,后边A2排第二。那假如说现在呃,A直接更新变成四的话,很显然那A的行号就要变成一,B的行号就变成二。所以我们后边的这张表,最终得到的TOP2的这张表也是会更新的,有更新操作,那当然就不能直接涂了。
23:08
他必须要to changelos才可以。我们接下来把得到的这张结果表放在这里,然后做一个打印输出。最后我们把DVEXE执行起来,因为我们还涉及到这个流式转换成流再去做处理,接下来我们看一下运行下。观察一下得到的结果是什么样子。我们这里可以看到输出了一大堆的结果,我们从头来做一个分析。首先前面非常简单,加I加I2条插入,诶,那这里其实就是第一条marry数据来了之后,当然最后的结果表里边就只有一条数据,Mary,它的访问次数是一啊,那当然他排第一了,没有问题。
24:03
然后Bob这条数据来了之后啊,那我们知道他跟Mary次数一样的话,他肯定是跟在后边的,所以他排第二,同样它也是插入我们当前最后的TOP2,这张表里就有了两行数据,而且我们知道。最后的结果,TOP2这张表里只有两行数据,所以接下来做的操作就不再有加I的对应的数据了啊,所有的操作全部都是更新,没有直接后边的插入了。我们看接下来如果要是第三条数据,爱丽丝数据进来了之后啊,爱丽丝这条数据进来之后,其实后边这张表不会做任何的更改,因为爱ice丝也是一,那相当于排第三了嘛,我们提取TOP2,那相当于就把它删掉了,没有这条数据。他只是前边我们做的那一个。子查询的结果表里边应该有爱丽这条数据,最后结果表里边相当于就没有了。然后接下来的更改是再输入一条Bob访问数据的时候,我们可以看到Bob如果再做一次访问的时候,很明显当前Bob的数据就要改成二。那么同样。
25:13
它的访问次数变成了二,当前它就排第一了,所以我们看到其实这一条Bob的数据来了之后,接下来应该涉及到我们结果表里边的几条数据的更改呢?我们可以看到。从这里来划分。上面的两个减优加优操作,其实都是跟。第二条,Bob的访问数据相关的,也就是说我们输入了这样一条数据之后,就应该能够一下子结果表里tolo stream的话,我们的更新日志就应该有这样的四条。为什么是这样呢?啊,因为之前我们上面是Mary,然后它的访问次数是一。排序是一,Bob访问次数是一,排序是二,现在Bob又来了一条数据,那Bob变成二了,后边更新的话,我们不是直接把这个更新完了就可以了啊,原先它的排序是二。
26:14
那现在它的排序应该要调整成一,所以我们会发现更新之后的效果应该是。BOB2排第一,MARY1排第二,是这样子,所以这里就涉及到了两条数据的,删掉之前的数据,然后再增加一条新的数据,好,那所以我们看到删掉的是之前的第一行数据,MARY11增加的是什么呢?增加的是新的第一行。BOB21,删掉的是BOB12之前的第二条数据,增加的是新的第二条数据MARY12,所以整张表是变成了这样一个样。那同样,接下来如果Bob再来一条数据的话,那这条数据就不涉及到更多的更改了。它只涉及到。
27:07
Bob本来就排第一了,现在变成三次访问还是排第一啊啊,那所以它只涉及到当前Bob访问次数的一个更改,改成三就可以了,所以是减减u bob21加u bob31。那后面Mary又来一条数据,同样他也不会撼动Bob的第一位置,同样也是只要把自己对应的数据加加减减就可以。所以我们会发现,对于top n这样一个需求来讲,如果只是当前某一行数据的更改的话。就是它的这个次数做了一个更改排序没有更更新的话,那就相当于只是有一个减U加U,那如果要是涉及到了排序的变化的话,那有可能就涉及到多条减优加优的更新操作。这就是普通的top n具体的实现。
我来说两句