00:00
接下来我们再给大家讲最后一种udf的类型,那就是表聚合函数啊,这个听一听名称就好像更加复杂,它其实是什么呢?就是之前有点像我们之前讲的这个table function和aggregate function的一个合体啊,之前我们讲的这个table function的特点是。输入是一行对吧,所以它是一行里边的字段作为参数啊,你可以提取任意一个提取出来作为输入,然后呢,得到的是一系列的行,也就是说得到的是一张表,是一对多的这样的转换。而后边我们这个aggre aggregate function呢,是输入的是一张表里边的很多行数据,然后最后输出的是一个标量值啊,或者可以认为就是一个一行数据对吧,所以是多对一的一个输出,一个是一对多,一个是多对一啊,那当然如果要是把它俩结合在一起表聚合函数那是什么?那就是多对多对吧啊,有时候把它叫做这个u d taggs对吧,User table functions啊那。
01:09
用户自定义表聚合函数,它的特点就是把一个表里面的数据,诶,最后做聚合之后呢,还有多行的输出,相当于输出的又是一张结果表,就不是一个具体输出的数了,而是一张表啊,啊,所以呃,这里边它的实现其实跟那个aggregate function是非常非常类似的,整体的定义也差不多,它的差别就是之前我们那边是get value只得到一个,只得到一个具体的值,具体的数,而现在是怎么样呢?那就得输出不同的行了,对吧,就像之前的那个呃,Table方式一样,你要不停的那个输出。那个那个collect啊,直接收集到这个新的数据,把它输出出去,所以这里边有一个比较常见的应用,那应该是应用在哪里呢?啊,就是像我们这个需求,假如来了需求是top n。
02:03
比方说我现在要选取还是一张表啊,饮料这张表里边我要选取当前,呃,这个价格price啊,刚才我们取的是最大值嘛,现在我要取TOP2的价格,这怎么办呢?诶,这个就很麻烦了,对吧,大家想到之前你那个最大值我直接取出来max直接把它聚合得到一个值就也就完事了,但现在我要它的这个TOP2,那这个TOP2的话,呃,你来一个直接去聚合这个,这个感觉好像好难做啊是吧,但其实也是可以做的。大家会想到它其实主要就是要调整我们保持的状态和,诶你当前这个来了一个新数据之后,保持的状态到底是怎么样去更新,主要就是改变这个机制而已。所以它具体的实现也是中间核心的有一个a accumulator,对吧,主要是有这样的一个,呃,这个累加器作为当前任务的一个状态,那我们当前这个状态里面要保存什么呢?既然是TOP2的price嘛,我这里面保存的应该就是。
03:11
当前最高的那个价格和当前第二高的价格对吧,其实就是保存这样的一个状态,然后我最后输出的就是来了一个数据的时候,同样还应该有一个accumulateate对吧,这个方法,那这个就应该怎么样呢?是来了一个之后,就要跟之前保存的第一高和第二高都做一个对比,对吧?啊,如果他比那俩都高。那当前的这个最新的这个price,它就是最高,后边就依次往后挪吧,啊,如果说它比第一个小,那我就挨个往后比对吧,再比比第二个是不是比比第二个高,如果比第二个高,那就代替第二个,后边的依次往后挪对吧?啊,如果比所有的都低啊,那就相当于什么都不做。对吧,相当于我们这里边就可以那个不输出数据,或者说你输出一个一样的数据,这个都是没问题的啊,所以这里面大家看到啊,也是三个方法,必须要实现,一个叫做create accumulator,创建一个状态累加器,然后呢啊,还有一个来一个数据做的这个计算累加accumul对吧?啊这个也是自定义的啊,必须去自己写就而且是他那个写死的名字,必须叫这个名。
04:20
另外还有一个方法,这个不叫get value了,这个叫it value。因为在这里边你并不知道它到底要输出多少啊,所以我们这里边可能就是,呃,就是有可能要输出多行嘛,你这里边的这个top,那不是得输出两行嘛,对吧,至少是两行数据啊,所以在这个过程当中,其实就是用跟之前aggregate function同样的机制,实现了输出多行数据,实现了多对多的啊到表的一个转换,这就是所谓的表聚合函数。那实际应用的时候也是要注意啊,之前我们那个叫agg调点aggregate去做这个操作,对吧,大家还记得前面我们这个聚合函数这个max,其实呃,我我我们可以直接在CQ里边直接这么去写,如果要是table API里边的话,那就是点,那这里边我们如果要是这个表聚合函数的话,这个CQ里边就没法写了,我们只能用table API这里边就是直接点flat aggregate。
05:18
大家知道flat,那不就是Fla map不就是一对多,能把它打散吗?那现在这个也类似,对吧?Aggregate是多对一,那现在flat aggregate就是多对多。啊,所以说这四种这个UD udf函数,大家回过头来一总结的话,那就是从简单到难,从简单到复杂,一开始的标量函数FUNCTION1对一对吧,呃,一行,得到一个具体的标量值,统计出来的结果,计算出来的结果。后面的table function呢啊,那就是多对一,呃,一对多对吧,一行得到一个扩展出来的一个表,一个结果,那用的时候那就得let join了啊,做这个侧向连接了啊,那接下来我们又是聚合函数,聚合函数是多对一,就是一个表里边的每一行数据挨个做聚合,哎,输入进来之后得到最后一个结果。
06:11
那最后的表聚合函数更加复杂一点,多对多对吧,可以输出top n的这样的一个结果。好,那接下来我们还是在代码里边做一个实现啊。呃,后面这个其实就是必须要实现的方法,跟刚才非常类似啊,整个过程也非常的类似,只不过就是这个输出数据的时候,由之前的get value变成了一个it value啊,这这就这个就是大家在具体使用的时候知道怎么回事就可以了。好,那我们写在代码里边还是新建一个,因为跟之前逻辑类似啊,我们可以直接照抄。Object,呃,这里边我们管它叫做table,我就叫AJ吧,简写啊a j function test。然后下边main函数,哎,我就直接从这个aggregate function里边。
07:05
我原封不动把这些都抄过来吧先对吧,因为这个过程其实要改的东西很少啊。它的用法也非常类似,注意这个引入进来之后,还是不要忘记把上边的这个影视转换先改掉啊,然后下边这个也改了啊,啊,然后我们前面的这个是执行环境表的,执行环境后边读取数据,读成一条流,Map成样例类,然后分配时间戳和watermark,呃,把这个流转换成表,然后接下来诶这些我们就要。我们要定义的是表聚合函数的实例了,对吧。哎,我们这里边就以这个top n给大家来做一个例子,这里边要定义的就就直接给大家定义一个TOP2吧,对吧,比方说我这里边啊,就是TOP2TEMP这样的一个一个函数啊,那所以后边我要去new的也是一个TOP2TEMP。
08:00
里边呃,这个就不需要再去传参了,因为大家知道你这里边如果要是top n的话,我这里边TOP2跟TOP3后边处理那个逻辑肯定不一样,对吧?TOP2的话我就比前两个就完,就存两个状态就完了,你要TOP3的话,那得存三个数啊,而且那个比较的逻辑肯定会更复杂啊,所以这个过程我们就举一个最简单的例子,就是TOP2啊。呃,然后在后边我们这个这个这个调用先先删掉啊,然后后边我们做这个。自定义的时候实现一下啊自定义。一个表聚合函数。实现呃TOP2功能,好,这个来给大家看一看,这个东西怎么写啊啊首先我们这个实现一个top part temp对吧?呃,这里边大家发现了,之前我们做这个agggate function的时候,不是里边我们做聚合的时候accumulate这个要自己定义它,它没有返回值吗?呃,所以这里边必须把状态得定义成一个自定义的类,对吧?状态类,所以这里边我也是一样啊,自定义状态类,先定义这个啊,自定义状态类好啊,那这里边我们这个定义一个,呃,这个就叫做TOP2,呃,Temp。
09:25
ACC对吧?Accumulator啊,这个类里边我要保存的,哎,我们就简单一点,就保存两个值,最高当前最高温度和当前第二高温度啊,那就是一个bar highest temper,它是一个double练习,哎,这个初始值给什么呢?啊,有同学说那给0.0嘛,但是温度这不好使啊,对吧,你给一个这个0.0,这个听起来不太靠谱,所以我可以直接给一个double的mean value对吧?啊,这个只要是你把它一更新,肯定会比这个这个大啊。好,然后我们给一个这个。
10:02
Second highest。碳。第二,高温。同样也是一个double类型。呃,这个double点最小值也是初始值给一个最小对吧,这是我们当前的这个初始状态啊,先定义,这是这这个状态类先定义出来,然后接下来,呃,这里边我们实现这个TOP2功能的话,它需要去extend一个table aggregate function啊,这个就没有混淆的了,对吧,只有这个table API里边给我们提供了这么一个东西。然后里边它跟A的方式一样,也要传两个参数,前面这个是输出结果的行的那个类型啊,我们这里边可以定义成元组啊,这里边给大家定义还是一个。Double,诶,这里边注意啊,不仅仅是一个double,因为你还得告诉我,这到底是当前的这个第一高温还是第二高温,对吧,所以后面我们再给一个rank,类似于这样啊。
11:07
类似于一个这个输出的是。这个。呃,Temp温度值和它的rank这样的一个操作。然后后边呃,再给一个类型,这是当前的这个状态的类型,Top two temp ACC,好把这些都定义好,它必须要复习的方法,诶大家看这里边是就只有一个创建的方法了,连我们那个get value都没了,对吧?所以那个it value也是得自己去实现的,按照那个标准去实现的啊,那这里边我们就把这个状态初始状态先写出来,New,一个top to ACC,把这个定义出来,然后后边就只能自己写了,对吧,这个是。呃,初始化状态。然后接下来我们这个是。每来一个数据。
12:03
一个数据后聚合计算的操作,这个就是我们核心了啊啊,那同样DF定义一个a accumulator a accumulate,对吧,必须得是accumulate啊,不能拼错。然后里边前边还是必须是先有一个状态top two temp ACC对吧,后边是当前输入的一个,我们还是因为是温度取前两名嘛,我这里边就还是把当前的温度字段啊取进来就可以了,调用的时候也是把温度传进来啊,那这里边不能有。不能有返回值,哎,所以说只能是直接在这上面改对吧?哎,所以接下来我的操作是什么呢?就是判断当前啊,将当前温度值跟状态中的。
13:02
最高温和第二高温。比较就是如果。大。的话就替换对吧,那具体替换逻辑那就得各种if else啊,我们先判断跟最高温先判断啊,所以这里边如果temp大于acc.highest temp的话。哎,那接下来。就是如果比最高温还高,哎,那就。排第一。呃,其他温度依次后移啊,这里边其实就只有一个嘛,所以就是原来的第一顺道第二对吧,依次后移。啊,这里面我们做的就是ACC点啊,既然要后后移嘛,我们肯定是先把那个就是second,原先的那个second就不要了,对吧,所以就直接second先直接赋值等于acc.highest然后呢,acc.highest等于当前的碳。
14:13
替换完事,那else if如果说比最高温小,但是它比第二高温要大的话。多了一个问号啊,比第二高温要大的话,那是不是就直接替换第二高温就可以了,对吧?啊,所以这里边就直接ACC second。这个直接等于碳就完事。啊,所以大家看这个TOP2的话,这个相对来讲逻辑还简单一些,但是可想而知,你如果要TOP3TOP5TOP10对吧?啊,这个就非常复杂了啊啊,我们给大家只实现一个最简单的啊,这里边儿这个accumulate,另外还要注意一个,呃。实现实现一个。输出数据的方法。
15:03
呃,这个写入结果表中啊,所以这个方法不是,不是之前我们熟悉的get value也不是,呃,在在之前那个window API里边的那个get result啊,这里边是it value必须叫这个名。发出一个值对吧,那同样它定义的参数也是定义死的,必须是这样的参数,首先是一个状态,当前的状态啊,Top two temp ACC后面还有一个参数,注意这个参数啊,这个大家相对来讲比较熟悉一点啊,它其实是个什么呢?其实是我们当前输出数据的这个collector那个类型,所以这就有点像我们之前那个window API里边,或者Fla map API里边,我们输出数据的时候那个al.collect那个用法了,对吧?它本身是个collector。这个是就是底层要求的啊,必须是这个类型啊collector,然后这里边呃,我们给一个我们输出什么呢?呃,输出要的肯定是这个对应的double int这个类型对吧,跟这个要匹配上。
16:10
呃,这里边double int啊。好,然后这样的话,我们就可以给它做一个输出了啊,大家看这样的话类型就匹配上了,对吧?啊,所以这个过程其实还是,呃,这里面我看一下这个collect collector应该是选对类了吧,刚才没有太注意啊。Cor放在了。Java u这个这个这个我们还是用这个flink的啊。引入的时候大家注意一下。Port class,我们用这个flink u collector对吧?啊,拿这个来使用,然后接下来这个输出就非常简单,假如说我们不考虑这个,呃,就是说没有更新,我不输出的那种情况,我就是只要现在要输出,我就直接把当前状态的最高温和第二高温输出就完事儿啊,所以其实就是两行输出嘛,那不就是alt.CLA输出两次嘛,对吧,当前是ACC highest temp排名第一,Out点。
17:14
ACC的second highest排名第二,这样输出就可以了。然后这个状态大家可以看一看啊,就实现起来可能稍微复杂一点,那后边这个呃,调用的时候怎么调用呢?先是在前面把这个实例先创建出来,对吧,然后后边调用的时候,前面我们也已经看到了。那个flat,呃,就是flat aggregate对吧,调这个方法,那调它之前呢,先要你这里边直接这个flat aggregate,其实基于这个也是可以的,但是我们其实还是想要分组,按照ID来区分嘛,所以先调。这里边给的是ID,然后后边flat map flat flat aggregate对吧,不是flat map flat aggregate,然后这里边我们就调用前面定义的top To Camp里边传的是当前的。
18:04
Temperature。后边我们可以定义得到的字段,As什么呢?温度temp和rank对吧,然后最后再做一个select转换。因为大家知道Fla aggre这个得到的其实并不是一个table啊,这是一个select aggregate table,最后一定要用它在做一个select,大家看只有一个方法select,用这个把它select出来得到的才是一个table啊,所以这里边我们select ID。呃,当前TS对吧,然后temp。Rank,诶,这样就得到最后的结果了。好,我们现在运行一下,看看这个效果怎么样啊。诶,这里边报错,我们看一下这个报错的原因是什么啊。哦,这里边我们的那个匹配不对啊,这里边这个呃,Input只有ID time和哦,这里边大家看到我们这里边只有这个聚合之后的是这个对吧,我们已经按照ID去做了这个group by了,所以后边就没有这个TS这个字段了啊,所以把这个删掉,重新。
19:12
我们只看这个当前聚合之后的结果。看一下这个状态啊。好,大家看现在是正常运行输出了,我们看到现在的结果是什么呢?哎,这就是第一个数据341输入的时候,哎,大家看最高温35.8,哎当前的最高温对吧,最低温这是很大的一个负数,我们的那个double的最小值嘛,呃,不是最低温啊,第二高温,第二高温还是那个最小值呢。啊,然后呃六七十输入进来的时候都是这样对吧,当前值是最大值,然后第二高温是呃这个之前的呃定义好的double类型的最小值,然后341来了一个第二个温度,大家还记得那个是多少吗?这个341来了一个37.9的时候,诶,我们发现当前35.8不是最高温了,对吧,所以大家看两条数据false了,之前的这个一和二全false了,我们现在是什么,现在37.9是一,35.8是二。
20:12
哦,然后后边后边大家发现这个我又来了一条数据,32.4的时候,其实没改变它的这个,呃,数据的这个结果啊,但是呢,这里边我们因为是每来一条数据之后。这里边都要做一次输出,大家看到了吧,Out collect对吧,每次来了之后都要调用这个方法,所以都输出了,没有判断它是不是更新,所以这里边呢,他每次都要做一次更新的话,那就相当于还是把之前的先制成false,然后又新写了两条数据,尽管这两条数据一模一样啊,这是这样的一个状态。然后最后又来了一个39的时候,大家发现诶又更新了对吧,比之前的这个大,所以我又把它更新成39是排第一,37.9排第二。这就是这个表距和函数实现top n的一个过程,大家可以下来之后好好的练习一下。
我来说两句