00:00
最后我们再来介绍一下flinkq当中最为复杂的一类udf,那就是表和函数cgg function。字面上就能看得出来,它其实是表函数和聚合函数两者的结合。那表函数是什么特点呢?表函数特点是一对多输出,可以得到多行数据的输出嘛,可以进行扩展,而聚合函数的特点是什么呢?聚合函数的特点是对多行输入数据进行聚合。所以表聚合函数的特点是可以有多行数据输入,然后进行聚合,聚合得到结果呢,又可以扩展出多行的数据输出,所以相当于它得到结果也是一张表,而且可以输入表当中的多行数据去进行聚合转换。那如果说跟之前我们讲过的其他类型进行对比啊,如果说function。
01:03
标量函数是一对多的关系,一对一的关系的话,诶,这是一个简单的map对应,然后后边表函数那是一对多的关系。就类似于一个。在后边,聚合函数呢,类似于一个多对一的关系,这是一个reduce,或者而现在表聚合函数。这是真正意义上的多对多的关系啊,那对应的这个表聚合函数其实在stream API里边是没有对应的API实现,那所以它也是最为复杂,最为可以说是比较有特色的一个table API CQ当中的一个udf。那我们看一下自定义表聚合函数,那需要做什么事情呢?呃,同样的也是需要去继承一个抽象类。Table aggregate function,那么table aggregate function和前面我们讲的aggregate function。
02:00
非常的类似,同样它也有两个泛型和ACC,表示最终输出的字段的类型,而ACC表示的是累加器中间聚合结果的类型。那在里边呢,必须实现三个方法,跟之前agg function一样,首先应该有一个createumulator,创建一个累加器,然后呢,有一个accumulate方法。这是核心方法,就是每来一条数据,我们就进行在之前的聚合结果基础上,结合这条数据进行一个聚合计算,更新聚合结果,聚合状态啊,那最后还有一个在之前的aggregate function里边是get value,最后得到最终计算结果。而在。表距和函数里边,我们知道它最后不是唯一的结果了,所以当前这里就肯定不是get value直接返回一个值了,而是emit value发射值,也就是说当前这个发射值,那很明显它就不是直接返回,所以它是没有输出类型的。
03:03
然后它有两个参数,第一个是accumulator啊,就是ACC类型的累加器,然后第二个呢,是一个。它的类型是collect啊,这就很明显了,所以我们当前的输出可以输出多个嘛,就是使用alt调用alt.collect的方法去进行多行数据的输出。而且这里需要注意的是,在表函数里边。我们这里的。Get value方法是不需要手动去写的,我们可以直接override,而在表聚合函数里边,It value方法没有对应的实现,没有对应的定义,无法override,只能手动去写,而且只能叫这个名字,所以这里使用还是比较麻烦的。表距和函数进行处理计算的过程其实也会更加的复杂啊,因为我们知道表距函数得到结果是一张表嘛,啊有多行数据,有时候这个就是我们这个多行数据有可能就要做更新,更新的时候呢,它有可能不是只就是完整的把这一个多行数据全更新了,不是更新一张表,而是这多行里边的某一行单独去做一个更新啊,或者是某几行单独去进行一个更新,那这个时候如果说我们把所有的。
04:22
表里边的内容全部做一个更新就不够高效,哎,那所以为了提高处理效率,Aggregate table aggregate function里边还提供了一个方法,叫做emit update方法。哎,那很明显tra是撤回的意思嘛,所以它就是说在结果表发生变化的时候呢,我们可以以撤回老数据,然后更新发送新数据的这种方式增量式的进行更新,相当于写一个changelo啊,那同时另外还有一个value和这个update withtra这两个方法同时出现的时候呢,哎,那优先调用的是待撤回的这样一个增量更新的方法。
05:05
那表决完之后我们就会发现了,它确实这个定义和使用也都会非常的复杂,那有没有具体的使用场景呢?最典型的一个就是前面我们提到过的top n啊,因为我们说top n最终的输出。前N个啊,数量这个最大的或者最小的前N行数据都要进行一个输出,每来一条数据之后。就要更新一下我们之前的top n的结果,那很明显这就是一个多对多的输出嘛,所以最后我们要维护的是这样的N行数据,所以这里边如果要对这个数据进行一个。一个聚合计算处理的话,我们只得得得到一个值,调用aggregate方法。实现一个agate方,一个聚合函数,得到一个值,一行数去进行输出显然是不够的,哎,那所以这里面我们得用表聚合函数table f方式啊,那就是多次调用al.c把这N行全部输出就可以了。
06:11
所以接下来我们可以在代码当中具体把这个top n做一个实现啊,那这里我们会发现,如果说要自己在代码里边把对应的top n一个一个输出的话,那每来一个之后,具体的这一个怎么更新的这个操作也得自己去定义啊啊,那所以这个通用的top n就很难实现,所以这里边我们就最简单的实现一个TOP2的插询就可以了,我们来做一个举例说明。接下来是新建一个class。同样还是udf test。现在是。Table aggregate方式。那么前面的整体逻辑我们还是先copy过来。这里需要注意的是,对于表具和函数而言,因为它太过复杂,现在C当中是没有这样直接调用的。呃,处理的这种方式的而。
07:08
Table API里边有对应的调用方式,所以后边我们进行查询转换的时候还得使用table API,这个是后话,我们等到调用处理的时候再去做具体的实现。现在关键在于我们先要去。实现一个自定义的。表聚合函数。哎,那上面当然也是要。注册自定义的函数了。那接下来我们首先看一下。Public。Static class。我们想要实现这样一个TOP2,我们就直接把它命名叫做TOP2吧。Table aggregate function就是这里table functions下边的。表聚合函数,同样我们看到它里边有泛型T和ACC,这跟之前的聚合函数是一样的,那我们当前聚合起来最终的结果应该是啊,就是输出第一名是谁,第二名是谁啊,那所以这里边。
08:11
如果定义的简单一点的话,最简单就应该是一个数值,然后一个排名,一个数值一个排名啊,那所以就是一个二元组了,我们把它定义成。二元组。二然后前面一个数值的话,我们用长整形,然后排名的话,我们用一个inte就可以了。当然这个我们要把抓把。下面flink给我们自提供内部提供的元组类型要引入,然后接下来还需要有一个。累加器的类型,哎,那这里面我们的累加器需要保存什么样的数据呢?哎,这里边如果说只是二元组,如果只是TOP2的话,这个比较简单,就只需要保存当前最大的值以及第二大的值就可以了,那所以这里边其实就是保存两个长整型的值,当然了,为了考虑到后续的扩展,那如果说我们想要这个,呃,定义一个TOP3怎么定义呢?诶,那可能这里边你还得扩展成三元组,所以我们单独的跟之前类似啊,也是单独的定义。
09:21
定义一个累加器类型。包含了。当前最大和第二大的数据。所以当前如果我们的数据都是长整型的话,Count的话,比如说像之前我们做的那个count统计啊,就当前每一个用户点击了多少次,或者说访问的那个页面啊,呃,访问了访问了多少次,它的那个个数是一个count的话,那我们这里就是一个长征性质,所以。A public static class,我们可以定义一个top two accumulator。
10:04
里边非常的简单,AUM。里边非常简单啊,我们定义的长整形,一个是当前的max。另外一个是。Second max。就是最大和第二大的值,那接下来这里当然就是直接给一个top to aumul就可以了,接下来就会简单一些,里边我们必须要实现的啊,现在就只有一个了,只有一个create accumulator,创建一个累加器啊,这个也比较简单,很明显我们就是直接。你有一个。Top aumul得一个top to AUM aumulate aumul里涉及到另外一个问,就是我们前面这里是没有给它赋初值的,哎,那所以这里边如果我们创上面这个本身啊,这个类里边没有做做初值的定义的话,在创建累加器的时候就要去做一个定义了。
11:10
这是另外一种方式,我们可以把它的maps定义成,那定义成什么比较好呢?为了方便比较,如果接下来有值的话,就应该直接更新这个max。那所以这里边我们干脆就把maps和second maps都定义成长整型的最小值,诶,这样的话就会最方便,接下来默认来了之后一比较,这个逻辑我们就不用单独判断了,直接来了数之后一比较,只要有数肯定比它大,那就直接更新啊。那所以当前我们把max和second map。全部定义成长整型的啊,那最后当然是。Top to accumulator就可以。这是创建加气的方法,那接下来呢,关键还要实现一个accumulate的方法。
12:01
定义一个。更新。累加气的方法。这个方法跟之前也是一样的,就是只能是public void cuumulate。必须这么写,没有其他的方式,好,那这里边第一个参数呢,必须是当前的累加器类型,我们就把它叫做cuulator吧。后边啊,那接下来当然是传进来的数据了,传进来的数据那我们当前就只传一个长整形的值就完了嘛,那所以这里边直接传一个。当前的value。定义好了,那接下来那就是做一个非常简单的逻辑判断,然后去更新当前的ul就可以了,所以当前当然就是判断value是否。大于我们当前的最大值呢,如果要是比最大值都大的话,那没什么好说的,就相当于当前值变成最大值啊,最就是当最大的值现在用当前值来进行一个更新,然后第二大的值呢,那就用之前的最大值来更新啊,所以当前我们要做一个。
13:14
Second max先做一个赋值啊,因为之前的第二大值就没用了嘛,把它顶掉相当于可以把它用max来做一个更新。然后接下来max更新成。Else if。如果说当前的值比max要小,最大的数要小,但是呢,比第二大的数要大,哎,那所以接下来l if value,它大于。Second max的话,那这个时候我们是需要更新,那不需要更新max,只要更新max就可以了,所以是accumulator second map等于value。那当然了,还有一个else,那就是它比second max比第二大的也小,那也小的话就不会更新我们的TOP2嘛,那这样的话就什么都不做就完了,所以我们的更新操作就是这样的一个if判断。
14:10
所以我们也会想到,如果想要扩展的话,那就是这个会扩展出更多的分值啊,那就是假如说我们当前是TOP3的话,那就应该保存max max和third max3个值,然后接下来我们得依次判断,如果大于最大怎么办呢?啊,那比最大的小,但是比第二大的大怎么办呢?比第二大的小,比第三大的大又怎么办呢?啊,其实就是这样一个叠加的,一层一层去做判断的,If else的一个一个扩展啊,那所以这个做完了之后,最后我们要实现的就是一个it value了,就是输出结果的一个过程。输出结果。当前的。TOP2。所以这里边同样我们并没有能够override的方法,但是我们必须按照这种标准去实现limit value叫这个名字,首先它的第一个参数也必须是当前的累加器accumul。
15:10
后边呢,后边必须是一个collector。这个是定义死的,必须这么写好,那里边这个collector里边呢,里就是。当前我们想要输出的数据类型了啊,必须是跟这里的输出类型一致,所以直接copy过来二元组。后边是一个out,引入阿帕奇link下边的,这样的话,当前的类型就是正确。那我们现在要想要获取到的东西,那很显然就应该是当前的。状态里边的maps和second maps把它out.C的输出就可以了,当然了,这里还有另外一个问题,就是假如我们根本就没有更新过它,都是最初的这个最小值,那怎么办呢?最小值的话,那就什么都不要输出了嘛。诶,所以这里边可以做一个非常简单的判断,就是if。
16:05
accumulator.max首先看最大。Max如果不等于长整形的最小值的话,我们才去做一个输出,要等于的话,什么都不要干啊,那所以。当前输出一个二元组类型。哦。当前的accumulator。点max。后边应该是他的排名,那这是第一,然后接下来啊,接下来我们也不用用在那个else里面去判断了。直接去做一个判断。Second max。哎,那这里边直接把second map2做一个输出就完事了。这就是我们整个判断处理的过程,这样的话,自定义的表聚合函数就完成了,接下来的事情就是在代码当中去做一个注册调用了,那当前如果要是注册的话,很显然。
17:05
也是非常的简单。直接把TOP2。注册进来就可以了。放下来,哎,然后接下来呢,哎,那就是调用,调用的这个过程稍微的有一点麻烦,首先我们应该有一个能够去做判断的长整型的值,哎,那我们自然就想到了,之前我们不是有那个窗口聚合之后,然后再统计当前TOP2的那个需求吗?诶,我们可以直接拿过来用,之前我们曾经做过。对应的这个窗口聚合,诶,那很显然我们现在可以直接统计一段时间内的前两名用户,在这个两步操作啊,我们内嵌的这个sub query里边,其实就可以直接拿到当前每一个窗口内所有用户对应的。访问页面的次数,点击的次数啊,它的活跃度到底是多少,然后接下来我们就可以基于把这个值传给TOP2函数,然后进行计算,拿到当前的前两名啊,那当然了,前两名应该是一个value,一个当前的排名情况,那最后我们把对应的数据输出就可以了,所以当前这一段可以做一个。
18:21
Copy,直接拿来用。首先把这个sub query去进行一个执行。我们把这个就直接叫做。窗口聚合。A j query。传进来做一个调用。那得到呢?当然是一个table了,我们把这个叫做a table。然后接下来为了在后边的使用过程当中处理比较,呃,容易的话我们还需要啊,这里已经注册过了,所以接下来我们就可以直接在CQ或者table API里面去调用了,前面我们提到因为CQ里边对表聚合函数支持的并不是特别的好,所以现在我们能够使用的方式呢,是直接调用。
19:10
Table API table,然后去直接做一个。Group b首先分组,我们这不是已经拿到了对应的这个每一个窗口内的所有的数据吗?那当前显然是每一个窗口内要做一个统计排序,所以应该是window当前的。Window end。啊,那当然了,这种调用方式将要被弃用了啊,直接传这个字段名称,这种方式要被弃用了,我们推荐的方式是给一个Dollar服。表示这是当前的一个expression,一个表达式,把这个。啊,那然后接下来在表聚合函数怎么样去调用呢?要调的这个table API的方法是flat aggre。所以我们会想到aggregate,这就是简单的聚合嘛,当然是传一个aggregate方式,那flat aggregate有点像那个map和flat map的关系啊,Flat aggregate很显然就是可以多次输出嘛,啊,扁平化的聚合啊,那所以这里边我们可以调用flat aggregate它里边呢,可以直接我们看到里边啊,要传的本质上也是一个table agg function table aggregate function,所以我们这里可以直接call。
20:27
然后里边去传入。传入当前的要调用的名称TOP2。然后后边还应该有传入对应的参数,那就是首先。我们这里边应该有一个CNT啊,CNT这是我们想要真正去只传这个一个CT,其实也就够了啊,就是把当前啊这个数量传进去,然后看一看。传给当前的accumulate的这个方法作为第二个参数传进来,接下来我们把按照数量做一个排序,把最大的数量,第二大的数量以及它的排名情况输出就可以了。当然了,这里我们是没有办法再看到对应的那个user了,如果想要看到的话,那应该就相当于我们这里输入的这个数据包含user的信息在包装一层,那最后输出的时候也就会多出user信息。
21:21
我们现在不需要看那么多,只是做一个简单的测试啊。所以有了它之后,呃,这个还得。后边我们做一个,因为还要提取对应的字段嘛,所以我们可以调一个as方法,做一个重命名,我们把它得到这两个字段叫做value,或者说这个count值啊,Value,然后另外再来一个rank。当前的排名。把它定义出来。那最后我们再做一个select,做一个提取。想要提取的字段啊,那当前我们首先应该有一个窗口吧,Window基于哪一个窗口?
22:01
然后后边啊,现在因为输出的这个数据已经没有优点对应信息了,只有这两个,那也就没什么好说的,只有把当前的value。和rank做一个提取输出,那这样的话我们就得到了一个。Or table?最后我们就可以把对应的转换成打输出了。那接下来我们可以运行一下,看一看得到结果是什么样的。我们可以看到当前表聚合函数输出的结果呢,哦,它这里边是有一个减D的一个当前肉的一个类型肉,看诶那为什么会出现减D呢。很明显减D代表的含义就是直接删除啊,所以我们会发现啊,在呃,前面这个十秒钟结束的窗口里边,尽管我们这里边没有输出user,看不清楚到底是哪一个用户,但是我们可以看到,首先啊,这里边来了一个加I21啊,就是统计出来第一的是。
23:13
有两次点击,然后接下来呢,还删掉了这个值,然后又加I6次点击,诶所以很明显就是开始的时候,诶,我们可能先输出到最终聚合的结果表里边的时候,诶,我们统计出来一个比方说Mary。Mary有两次点击,诶,发现它是当前这个最大的。统计出来次数最多的一个值啊,先把它放到了第一位啊,那我们后面其实也是一个增量聚合的过程吧,然后把它放到第一位之后发现,诶不对。它除了它之外,Bob他点击了六次这个更多啊,诶,那所以Bob统计出来之后,就把它更新掉了,所以是加I,然后6亿。那么对应的呢,Mary的这个点击次数二就变成了加I2,二就是这样的一个操作。
24:07
那后面如果再来对应的数据想要去进行更新的话。来的应该是谁呢?那就应该是爱丽丝,爱丽丝统计出来的话只有一,所以我们会发现爱丽丝其实是不会去更新我们表里的数据的啊,但是这里面呢,我们同样还是要把之前的全部删除掉,然后再获取一下当前表里边的最新的最当前最大值和第二大的值,诶,所以我们会发现啊,尽管没什么更改,还是来了一个爱ice丝一这样的一个统计啊,只有一次点击的访问,它并不会更新我们当前TOP2的榜单,但是我们输出的时候还是先做了一个减减D操作,然后加I,把对应的这两条数据做了一个输出。那后边这个就看的更加的明显了,就是首先来了一个,然后那么这是一一,然后之后呢,诶,它不是最大的了,那么我们把它要剪掉删除掉,然后更新成二一,这是最大的,那么之前的一呢,它变成了一二,排名第二。
25:12
这就是我们关于表聚合函数的使用和测试结果的解释。
我来说两句