00:00
接下来我们新创建一个SC的object。还是udf test,现在我们要测试的是表聚合函数,所以是table,这个就简写吧,A j function。那方法先写出来,那同样整体的处理流程跟前面差不多,哎,我们可以直接把前面聚合函数的测试流程直接都copy过来。前面首先要创建表执行环境,然后读取一个连接器表,从文件当中读取数据,然后接下来呢,注册一个表聚合函数。后面我们就可以使用这个表句和函数进行查询转换了,得到的结果表to ch stream打印输出。接下来我们具体要实现的,那就是一个自定义的表聚合函数。那这里我们要实现的功能其实就是TOP2啊,所以这里我们直接class。就把它叫做TOP2吧,然后extend。
01:01
Table aggregate function。我们看到后边要跟上两个泛型参数,一个是当前输出的结果,另外还有一个就是呃,中间累加器的结果,那对于这两个结果类型呢?啊,其实这里边有一个考量啊,就是我们中间聚合的结果主要保存什么呢?前面我说了,其实就两个值,一个就是当前最大的那个数据,然后呢,还有一个就是当前第二大的那个数据。那在这里我们要统计的到底是哪个数值去进行比较呢?哎,那还是利用之前我们讲过的这个top n的例子吧,我们当时统计的就是每一个用户,诶在比方说啊,这个窗口套餐的话,那是每一个用户在某个时间段内啊,一小时之内所有访问URL的次数的一个count值统计,哎,我们可以把这个值得到,那对应的排序呢,当然就是按照这个count值去进行套喷的一个排序。如果。当前的最大值,我们就保存在中间聚合状态的第一个字段里面,如果是第二大的对抗值,我们就保存在第二个字段里面,所以这就是一个基本的想法,所以呃,前面我们处理的这个流程呢,也应该先得到这样的一个URL count window table啊,这个应该还是需要有的,我们先把这一步copy过来。
02:19
所以进行查询转换的,这里我们就先做一个这样的操作。首先进行,这里进行的是窗口聚合。得到。Count值,然后接下来我们才能按照这个count值进行排序嘛,所以前面我们相当于省了一步操作啊,把这一步操作就相当于先做完了,然后接下来看看怎么样利用表距和函数去把这个不同的count值进行保存,进行比较,然后统计输出最大的前两名。所以我们就知道了,最后输出的结果呢,还是跟之前top输出的那个一样啊,就是我们当前如果是窗口top n的话,应该要输出到底是哪个用户啊,他在哪个窗口范围内统计的count值到底是几次,然后它的排名情况是怎么样,我们可以看到这个其实是比较复杂的一个多条数据的一个组合了啊,这是一个多元组,所以最好的情况我们在这儿还是使用一个样例类来做一个描述就好了啊,所以接下来呢,我们需要定义两个样例类的类型,一个是最终要返回的啊,我们想要显示的这个信息的对应数据的样例类,另外还有一个呢,就是中间进行聚合操作的时候统计的那个样衣类类型。
03:35
所以我们先在上边做一个定义吧。定义输出结果和中间累加器的样例类。Kiss class。首先我们定义这个输出结果,输出结果我们直接把它叫做TOP2RESULT吧。诶,那这样一个类型的话,其实我们会发现啊,对应的这些在这个URL window table里边所有的字段其实都要输出,另外呢,还要追加一个最后的排名情况,一个rank啊,那所以这里边我们涉及到的东西啊,可能有UID。
04:11
这是一个string类型。窗口的信息啊,那这个有就是window start。这是一个长整形,还有一个window and。也是一个长整型,另外还有当前它的count值。CG这是一个长长形,最后还应该有一个排名情况rank啊,这个应该用一个int就可以了,一个排名情况嘛,第一还是第二,直接返回就可以了。所有的这些数据里边,我们会发现啊,抗值和排名情况,其实我们最核心的内容需要去进行更新,去进行替换的,所以呢,我们的中间状态这个累加器啊,最核心的要保存的应该就是这个值,那现在呢,如果我们只是TOP2的话,这个rank也不需要单独去保存了,本质上来讲,我们就是知道最大的到底是多少,然后排名第二大的到底是多少就可以了,我们从这个累加器啊,它里边的对应属性的名称上就可以区分出到底排名是几啊,那所以这个就比较简单了,我们就相当于只要保存。
05:13
Class我们保存的这个累加器TOP2ACCUMULATOR。诶,那么里边其实最关键的啊,当然这个应该是可以变化的一个bar类型,最关键的就是一个是最大,目前最大的一个count值max count,这是一个长整型,另外还有一个。第二大的是second max。同样也是一个长程性,主要保存这两个就可以,然后另外还涉及到一个问题,就是如果最后我们想要输出当前的u ID Windows start window and这些信息的话,呃,在这个过程当中,我们看到这个表聚合函数啊,包括之前的其他的udf里边,其实是捕获不到底层的运行是上下文的,诶,所以当前比方说啊,如果我们做过这个K败当前K的信息,呃,上下文的所有信息是拿不到的,如果说我们最后想要输出的话,那就只有把它当成状态先保存下来了啊,所以。
06:08
在这个中间的聚合状态里边,除了最重要的这两个之外,当前我们还得保存一下UID。以及窗口的信息window start。和window。啊,这些也都是袜类型,因为。一开始初始的时候没有值,是等数据来了之后我们才会把它保存进来的,哎,所以为了最后能显示更多的信息,我们这里边对于这个类型的定义肯定就会复杂一点啊,就包装的信息就会多一点。另外这里还需要区分的一个是,如果是我们当前啊,针对每一个窗口进行套牌的统计的话,那这里我们同样一个窗口对应的这个累加器当然是同一个了,所以这个window start window and只用一个值去保存就可以,但是呢,这里的user呢,那我们知道啊,Max count最大的访问次数可能对应的是一个用户,然后呢,第二大的访问次数对应的可能是另外一个用户了。
07:06
哎,所以这里边我们这个UID啊,保存的时候也得有两个啊,比方说我们这里就是UID1这个表是最大的。Count值对应的那个user,然后UID2就表示第二大的count值对应的user,哎,那这样的话,确实我们保存的信息就会特别特别的多啊,啊,但是这个没关系,最后我们输出的信息就会更加的明显一点,所以在这里呢,Table aggregate function,它有两个泛型参数,首先要的是最终输出结果的类型,当然就是TOP2RESULT,然后是中间累加器的状态类型TOP2ACCUMUL。好,然后接下来我们就可以实现里边必须要重写的那些方法了,哎,那我们直接让它自动补全一下的话,会发现啊,这里直接能补全的就只有一个初始化累加器的方法,Create accumulator啊,那首先啊,这个createccumulator非常简单,我们就是创建一个TOP2ACCUMULATOR嘛,那这里边的值呢,初始值我们都可以附一个,哎,比方说当前这个count值啊,我们都可以附一个最小值啊,比如说我这里直接给一个。
08:14
长整形的最小值明value啊,那第二大的值同样初始,初始值也是长整型的明最小值啊,那后面这个UID的话,一开始并没有任何的用户,我们直接给now也是可以的,哎,那后面还有这个Windows start window end啊,我们直接给长整型的最小值。诶,那这样的话就有了一个初始的定义,有了这个状态之后,后边我们的核心逻辑呢,那是。每来一行数据。需要。调用,哎,我们必须要有一个accumulate这样一个方法去进行聚合统计。这里尽管没有办法自动补全,那跟前面的那个聚合函数一样,我们必须实现这样一个accumulate方法,名字不能出错,而且里边的参数呢,对应的这个关系也不能有问题,必须是这样的一个调用方式。
09:13
啊,那这里的cuul,首先它的这个第一个参数就是所谓的中间累积状态啊,所以是ACC。类型当然就是TOP2ACCUMULATOR了,然后后边呢,对应的其实是这一行数据里边我们想要参与计算的提取出来的那些字段啊,那这里的这个字段我们到底想要哪些东西呢?诶,那就得看我们这里这个中间聚合状态啊,到底得把哪些信息要更新,其实我们看到啊,所有的信息都要更新,首先就是当前的这个用户名肯定要有,然后呢,Window的信息也要有,尽管只是有一次就可以了啊,但是这个必须是要带着的,然后后边还需要有一个当前的count值啊,所以这几个其实就是我们这张表里啊,URL count window table里边的四个统计出来的字段全部都要放起来啊,那所以这里边我们就直接给一个UID。
10:05
String类型,然后c count值,这是一个长整型,另外还有window start长整型和window and长整形。这四个值全部都是需要。然后接下来我们就是具体的处理逻辑了啊,那每来一个数据之后啊,那首先啊,这个基本的信息我们应该先塞进去啊,那这个同一个窗口对应的这个聚合状态里边,肯定Windows start window and是一样的啊,这个也没关系啊,我们重复写入肯定是没有问题的,所以那就是ACC点,首先我们把这个Windows start。赋值成当前的Windows acc.window and。赋值成window end,当然我们也可以做一个判断啊,如果它不等于这个长整形最小值的时候再去做赋值,赋值过一次之后,后面就不会变了,然后接下来那就是得判断我们当前的这个count值到底有没有超过之前已经保存着的最大值和第二大的值了,那依次判断,首先先跟最大值来做判断。
11:12
判断。当前count值。是否排名前两位?所以首先判断它是不是比最大的大,那我们就是用当前传进来的CT,要去判断一下是否比ACC的。Max count要大啊,因为我们知道初始值它是长整形的最小值嘛,啊,这是一个很小的一个负数啊,所以只要你来了一个count值,默认情况下肯定就会超过之前的最大值就可以更新了,哎,所以这个时候呢,那我们就要做一个更新,更新的时候注意。来了一个最大的值,哎,那之前我们榜单里边的前两名其实是都要改变的,新来的这条数据变成了第一名,之前的第一名那就瞬移到第二名啊,第二名再往后瞬移,当然就挤出这个榜单了嘛,所以我们当前是TOP2比较简单,如果是TOP5 top10 top100更多的话,都是这样一个顺延的过程啊,那所以接下来我们这里就是做一个名次的顺延。
12:14
名次向后。顺延。所以这个顺延的过程呢,我们应该先把最后的一个先挤出去,就把第二名先挤出去啊,所以就是acc.second max count,应该让它等于之前的acc.max count,那同样。UID2,那就要更新成之前的UID1,就之前排名第一的顺延到排名第二去。然后现在的排名第一应该给谁呢?诶,那当然就是acc.max count就应该更新成现在传进来的CT啊,那当前的ACC最大的这个用户名啊,对应的用户名UID1就应该等于当前传进来的UID,这就是我们名次顺延的一个过程。
13:05
那else,如果要是没有之前的最大值那么大的话,哎,那继续判断,还得判断是否比第二名大,诶,所以是l if CT是否大于acc.second ma。如果比第二名大的话,诶,那这个简单就是直接更新第二名就行了,第一名保持不变,所以要改的是acc.second max count等于CT acc.UID2更新成。当前数据的UID,这就是我们更新状态的过程啊,所以可以看到就是如果是TOP2的话,这个逻辑还稍微简单一点啊,就判断两个就行了,那如果要是TOP5TOP10的话,后面这个E可能会非常非常复杂,这个条件分支会很多,而且这个顺延的时候,呃,也会比较麻烦啊,每一个都得往后去顺啊,所以我们会发现这种方式其实实现top n也并不容易。这是我们的处理的核心逻辑,除此之外呢,最后还应该要有一个。
14:05
输出结果数据的,哎,这样一个方法。输出结果数据,那我们知道所谓的结果数据不就是这里的TOP2RESULT吗?啊,那就包装成其实我们输出的就两条,一个是rank等于一的排名第一的数据,另外一个是rank等于二的排名第二的数据,所以这里面必须要实现的一个,这个叫做emit value这样一个方法。尽管这里边啊没有办法自动补全,但是这个方法的名称不能出错,而且里边传入的参数也必须是对应的这个参数啊,必须一样,那就是第一个参数。是一个中间聚合状态,ACC top2umulator,然后后边跟着的呢,啊,现在就没有数据了,因为我们已经要输出了嘛,从状态里边直接拿出来输出就可以了,那靠什么输出呢?当前it value这个方法没有返回值,靠的是。调用out.collect啊,那所以这里的out就是一个collector,我们选择flink u下边的这个or类型啊,那里边它的泛型是什么?当然就是我们定义好的输出结果的类型,TOP2 result。
15:13
这样的话。它不需要有任何的返回结果,所以返回是unit类型,这里的输出呢?诶,那我们还是得做一个简单的判断,就是假如说啊,我们之前这个数据就特别特别的少,根本就没有更新足够多的这个榜单,有些榜单还是空着的,就比方说啊,只更新了排名第一第二,就根本没来数据还是空着的,那怎么办呢?那就不应该输出了嘛,你不要输出一个长整级的最小值,这没有意义啊,那所以我们判断一下啊,就是对应的这个UID是否为空,或者是当前的这一个count值是否是常用值的最小值。如果是的话,那就别输出了。所以首先我们要判断的是。判断。Count值是否为初始值?如果没有。
16:01
更新过的话,那就直接。跳过不输出。也就是只有更新之后有效的数据我们才做一个输出,那就从最大值开始判断起吧。呃,If,当前ACC的max count。如果说它要等于。长整形的最小值的话,那就直接跳过,那就是不等于的时候,我们才做操作long点明。如果它不等于的话,接下来我们要做的操作就是要输出一句当前的最大值到底是什么啊,就是out.C我们这里是输出一个TOP2RESULT里边做一个包装,哎,这个字段就比较多了啊。当前的UID是什么?然后是窗口的信息start and,最后两个是count值和转,我们就按照这个顺序啊,呃,那其实都在ACC里面,UID我现在是最大值嘛,就是UID1。然后窗口的信息Windows和window and。
17:02
后面当前的这个count值,那当然就是max count了。对应的这个rank,那不用说了,就是一排名第一,这就是我们要输出的这个result。同样道理,如果说第一名为那的话,那就直接跳过那第二名也不用判断了啊啊,那如果要是第一名已经输出了的话,那后边我们还得继续去判断有没有排名第二的数据,那就是ACC点。Second ma判断是否是长整形的最小值。这个逻辑就完全一样,如果是的话就直接跳过,如果不是的话,更新过的话,那就输出一个TOP2RESULT啊,那同样我们这里面拿出来的还是在ACC里面去找UID2,然后ACC window start和window and对应的窗口信息。最后还应该有acc.second ma好当前排名第二的数量,它的rank,它的排名是第二,这样的话就是完整的逻辑,我们就已经处理完了。
我来说两句