00:00
这就是对于这个表距和函数的一个定义啊,我们确实会发现这个逻辑还是比较绕的,比较复杂,这还只是最简单的TOP2,如果要是这个N值更大的话,这个逻辑肯定会特别复杂啊,那所以这个只是为了让我们更好的去理解表聚和函数怎么用,实际使用的时候,我们还是应该用之前link给我们提供了的啊,使用开窗函数去进行排序输出。好,那接下来我们就看一看在代码当中怎么样去使用这个表聚合函数了,那首先当然还是需要先去注册一下吧,我们当前注册的这个就叫做TOP2啊,那名称的话我们也就叫做TOP2就好了,然后接下来呢,那就是进行查询转换,诶我们说先要得到这样一个窗口统计count值的一个结果,URL count window table,然后接下来呢,诶,那就可以使用CQL查询,去使用我们定义的这个表具和函数输出套帕的信息,但是这里边啊,有一个问题就是目前FNKCQ里边并没有直接使用表聚合函数的方式啊,这个太复杂啊,所以说这个接口没有给我们直接开放出来,他也不太符合标准CQ的语法,所以呢,如果说这个时候我们想要使用表聚合函数的话,那就只有一种方式,就是调用tableable API了。
01:14
啊,那如果调用a table API我们就想到了,那就没有必要在表环境里边再去单独注册了,直接基于这个table的对象去调用不就完了吗?所以接下来就是使用。Table API。调用表聚合函数。进行查询转换啊,那这里边就是URL count window table去调对应的聚合方法,那其实我们知道要想聚合啊,之前我们在data stream API里边是先要去做分组嘛,按键分组K,那同样现在也是一样,我们现在要统计的所有的这个信息,Count按照什么去做排名呢?那肯定是每个窗口内的count值去进行排名啊,所以我们应该先做一个分组。现在做的是group by啊,那里边传一个expression了,对应的这个字段名要传进来,把这个Dollar符先引进来,我们现在按照什么去做分组呢?哎,这里就简单一点,我们知道现在的这个都是滚动窗口啊,那直接按照window and一个字段就好了,直接按照window and分组,然后接下来就可以做聚合,我们看到可以去aggregate。
02:20
如果直接ever的话,这里边其实想要调用的是一个普通的聚合函数,并不是表聚合函数,那表聚合函数呢?那就是可以输出多条结果,什么样的调用可以输出多少结果呢?下面这个flat aggreate啊,我们看到这个名称就跟之前的那个flat back相似啊,所以它也是我们输出的时候不是在那个get value里边直接返回一个值就完了,而是使用it value out.click调用一次就输出一行结果,类似于一个扁平化的过程。好,那么就调用flat advocate里边呢,那就是一个expression,这个调用方式之前我们也说过啊,在table API里面,它是使用一般啊,我们是使用call的这种方式去做一个调用。
03:04
里边呢,先传入当前要调用的函数名称啊,那后边呢,就是我们调用这个函数的时候需要的参数,那这里的这个参数其实就比较多了啊,我们这个参数看哪里呢?就是看这个accumulate嘛,每来一个数据,其实调的就是这里,它的参数是什么就都传在后边了,除了第一个这是当前的李佳器状态,后边都是传进来的参数。所以我们看有u ID count Windows start window,其实就是之前这个表里边我们说的所有字段,哎,那这就没什么好说的了,每一个都放进来吧。第一个是UID。第二个是。CT。然后后边还有。Window start。以及最后一个。Window and。啊,这就是我们调用这个过程啊,呃,当然了,就是最后这个调用出来的结果,我们其实就已经包装成了一个TOP2RESULT,诶,那对应的所有的字段呢,其实在这里边也都有相应的声明了,那接下来我们就可以去做一个。
04:12
Select,呃,这个select的话,其实我们应该是把所有的这些内容都要select出来啊,那非要写一下的话,那就单独的放在这吧,我们想要的就是当前的UID是哪个用户啊,然后比方说我们可以调整一下它的顺序啊,它的排名是几rank是多少。然后后边是它的count值啊CT。最后我们再跟上当前的窗口结束时间,只要跟一个窗口结束时间就好了啊,那起始时间滚动窗口的话,一下就看出来了啊,所以接下来我们可以直接挂一下,这个就叫做。这样的话,所有的处理逻辑我们就全部设计完了。不过在我们想要进行测试之前呢,还要解决另外一个问题啊,因为这里面啊,在table API和CQ里边确实是有很多很麻烦很琐碎的细节问题,比如说这里我们的数据类型,像这里我们把中间累加器和输出结果都包装成了样例类类型,哎,那这个就没有任何的问题了啊,里边所有的字段,哎,我们直接使用Java的数据类型也可以,使用skyla的数据类型也是可以的,这个是没有问题的,但是这里呢,我们还涉及到了一个window end的提取,那这个window and来自于哪里呢?哎,那在之前我们其实进行窗口操作的时候,利用窗口的表值函数TVF扩展出的一个新的字段,那这个字段的数据类型到底是什么呢?
05:40
按照我们之前data three API里边的经验,我们自然而然的认为它是一个长整型的时间戳,但是在CQ里边不一样,CQ里边我们扩展出的这两个字段啊,Windows start和window and,其实并不是长整型的,而是time step啊,就是跟我们的时间属性字段啊,前面我们讲到的这个TS转换之后的这个ET,它的类型是一样,都是time stand。所以在这里啊,我们把window start window end提取出来作为参数,如果放在这个表聚合函数里边的话,这里并不能把它指定成是长整形。
06:15
而要把它指定成time STEM类型啊,那直接用这个抓va CQ里边的time STEM就可以,这是标准CQ里边对应的时间戳类型,那这两个一旦改成time stamp的话,那我们就会发现啊,前面在进行这个状态的定义的时候,按Windows start和window and也要改成time step。最后的输出结果里边我们更新成了time STEM,那当然现在也得获取到time啊,这样的话完整的做一个更新,那初始化的时候呢,那就不要给长整形的最小值了,直接赋一个空值那就可以了。Windows start和window and全部复成的啊,这样的话,后边我们的这个类型上就没有问题了,另外还有呢,就是我们如果得到了Windows start window and这两个字段,然后接下来呢,直接在后边就基于window and又做一个分组,后边呢又要提取相关的字段,这个其实对于table API和link CQ底层它会出现一些混乱,因为本身Windows start window end啊,这两个字段就像时间属性字段一样,它是比较特殊的,它是有特定的含义的,如果我们把提取出来的普通字段还是就叫做这个名称的话,就会引起混乱,所以为了避免这样的混乱出现啊,最好前面我们提出来之后。
07:33
做一个纯命名啊,比如说这个我们就简单啊,叫做wsar啊,那后边这个window and我们就叫做W。所以接下来呢,我们这里边去group by的时候就是group by w,哎,那同样我们后面做提取的时候,提取出来的是war和W,这个就不会跟本身进行窗口操作时候的扩展出的两个关键字的这个字段啊进行冲突了,所以最终我们提取出的是W。
08:02
这样整个代码就没有问题了啊,所以接下来我们可以运行一下,看一看这样一个表具和函数进行套牌的操作,得到的结果跟之前是不是一样。好,我们可以看到这里已经输出了对应的结果,这里的输出结果呢,哎,我们看果然这个是需要tolo STEM的啊,啊,它是有这个不仅仅只有加I的。插入追加的操作,而且还有还有一个减D,减D是什么意思呢?哎,那就是对于这个表聚合函数的操作啊,它只涉及到两种,一种就是追加,另外一种是删除,所以这里面我们所做的这个TOP2榜单的更新,它是怎么变化的呢?并不是我们想象的那样啊,他可以直接删掉某一条,更新某一条就完了,它是一旦来了结果之后。整个把之前的表单全部删掉,然后重新追加新的内容。所以我们可以具体来分析一下这里得到的结果,哎,那就是第一条数据,哎,Mary数据输入,然后后面呢是Bob数据,这里我们需要注意啊,这个表聚合函数,它并不是初始的数据,一条一条来了之后直接就聚合了,它是先要做,按照我们的流程,先要做这样的一个开窗统计。
09:15
所以后边表句和函数收到的呢,它基于的这个动态表是哪张呢?其实是这张URL count window里面的结果。而这张表里面的count值到底是什么?我们其实看到啊,其实只有三条值啊,那就是Mary的数据是两条,然后Bob的数据有三条,爱丽丝的数据有一条,所以我们看到啊,首先来了的是Mary的数据。它的count值是二,所以我们输出的结果,这个标准的定义啊,那是首先是UID,然后后边跟着的就是它的rank,它的排名啊,所以它排第一,Countt值呢,Count值是二啊,这是对应的这个到九点钟的这个窗口,这个为什么是九点钟呢?因为我们一小时嘛,一小时那就第一个窗口是零点到一点,但是我们现在呢,还有一个东八区的时间啊,所以加上这个时区八个小时的偏移量之后,那就是八点到九点了,我们这里显示的是本地时区时间。
10:11
所以这里看到啊,当前这个窗口里边,Mary他排名第一,统计count值是二,然后接下来如果又来了一个新的,来了谁呢?哎,我们看接下来来的应该是爱丽丝的数据。诶,那怎么体现出来呢?爱丽丝它本身访问量是一,所以我们看到啊,他是直接先把之前榜单里边不是只有一个Mary吗,直接删掉了,减D删掉,然后接下来再把榜单里面的两条数据插入进去。其实我们看到Mary这条数据并没有变化,还是排名第一,Count值是二,后边追加了一个。Alice排名第二,Count值是一。那最后要增加的呢,之前窗口输出的还有一个Bob的数据,Bob的数据统计值抗是三,所以我们发现鲍B一来之后,哎,那之前的榜单两条数据都要变了,所以接下来直接两条删除,删掉MARY1,它的count值二啊,排名第一,那爱丽丝排名第二,Count值是一,两条删掉插入的是新的排名第一的Bob count值是三,排名第二的是Mary count值是二。
11:14
这就是关于使用表句和函数进行TOP2统计的一个过程,我们也看到这个过程确实是非常非常的麻烦,而且呢,在使用CQ自定义的函数的时候,会有很多细节需要去考量,所以在实际使用的时候呢,除非非常特殊的情况,一般我们其实是不会去做这样的自定义实现的啊,那一般对于topn的需求,我们直接使用flink已经提供了的开窗函数,然后使用roll number进行聚合来做一个topn的实现就可以了啊,这就是这一部分内容。
我来说两句