00:00
我们给大家讲最后一部分udf函数,就是所谓的表聚合函数table aggregate functions,哎,这个听起来就更加的奇怪一点,呃,之前我们讲过这个表函数table function,也讲过聚合函数aggregate function,那现在看起来好像是把它俩结合在一起了,对吧?呃,Table aggregate function这又是什么东西呢?呃,简单来讲的话,之前我们不是说scale function标量函数,它它是一行对一个值,一对一对吧啊,那后面我们讲那个表函数呢,Table波方式呢,它是一对多一行输入啊,对应着扩展出来之后是多行数据,也就是扩展出来一张表,这是一对多的关系,然后我们后边的这个聚合函数呢,我们讲是多对一对吧,就是多行数据输入,然后最后经过聚合计算之后呢,输出的是一个值或者说一行数据,这是这个聚合函数的特点,那与之对应的表聚合函数是什么?
01:00
那就是多对多对吧,就是直接输入多行的数据,然后最后统计呢,统计出来也是多行,不是只统计来出来一个值,而是统计出来是多行输出数据,也就是结果也是一张表对吧?多对多的这个关系,那所以说呃,它的特点就是说把一个表里面的数据呢,可以聚合成为具有多行多列的这样一张结果表,整个表都是它的聚合节物,那这个就有点奇怪了,那我们看一下它的这个具体使用吧,具体它能用在什么样的场景里边呢?啊,最直观的一个应用,其实就是可以用它来做top n,大家想一想,如果说我要选取这个,呃,比方说我们这张表还是这张饮料的价格表,对吧,我要选取里边价格最高的两个饮料,要把他们整个的这个数据输出,这个我应该怎么样去做这件事呢?
02:00
当然有同学可能想到了,我可以去直接定义一个状态,然后这一个状态里边包含了这就是最高的那个,呃,当当当前就是我们那个,呃,本身是最高的最高价格的那个饮料,对吧?然后再包含这个第二高价格的那个饮料,然后呢,每来一条数据之后,我更新这个状态对吧?这是我们自然能够想到的,但是最后我们输出呢,哎,不是直接输出一个值就就够了,我要输出两行数据,那这种情况下怎么办呢?诶,这里边之前我们不是有aggregate吗?呃,点aggregate里边传一个aggregate function,现在呢,大家还记得我们之前那个看到group by之后能调的那个API里边是不是还有一个flat aggregate呀?啊,所以接接下来我们就是可以调一个flat aggregate,然后呢传一个表聚合函数,这里边我们传的你看就是比方说我们当前这个表聚。
03:00
函数就叫TOP2对吧,Top二传什么呢?传当前的这个price进来,然后最后我就哎把最后那个就是最高的和第二高的作为两行数据打散了,给他再再返回回来对吧?那这样就是相当于我们说的多对多的一个输入输出的一个状态了,那同样里边的计算的过程大家发现啊,基本上还差不多对吧?除了最后你不要只给我输出一个值,我要的是多行值之外,别的都一样,里边我也需要有一个状态accumulator,只不过当前的这个状态应该是什么呢?大家看这个635841个一个输入的时候呢,哎,我要保持的就是当前的最高的价格和当前第二高的价格都保存在这个accumulator,保存在这个状态里边就可以了啊,那你看这个就是六来了之后呢,诶,我就只保持一个六对吧,然后三来了之后呢,就是第一高第二高都有了,六三都保持。
04:00
进来,然后五来了之后呢,诶,发现那个第二高不是三了,对吧,我就把它替换成六和五,然后八来了后呢,八八比之前那个最高的六都都高啊,所以我就直接替换成八和六,四来了之后对他没影响,那还是八和六,所以最后呢,同样还应该要有一个输出结果的这样一个过程,对吧?哎,大家看到就是同样还是有这样的几个方法啊呃,里边之前我们说那个聚合函数里边,首先创建状态的时候有一个create accumulator,现在也一样,然后呢,呃,计算的过程当中必须实现一个accumulate,这里也一样,然后最后输出那个值的时候,之前我们那个是大家还记得吧,是get value对吧,它就是一个值直接get就完事了,现在就不一样了,现在因为我们输出的是多行值对吧,所以大家看这里边调用的是一个emit value,呃,是输出emit发射出去。
05:00
然后大家就想到这肯定就有点像我们之前那个,呃,就是表函数里边要发出数据的时候,就应该类似于一个用一个collector调它的collect的方法,一行一行发射就可以了,对吧,而不是直接呃,给我们这里边返回一个值啊,大家还记得之前我们做那个聚合函数的时候,那个get value直接就是返回了一个一个值嘛,就是做完计算之后,我们求那个平均数一除,得到了结果直接返回,那现在就不是一不是那样的一个操作方式了,对吧,类似于map和flat map最后的那个结果处理的方式不一样啊,就类似于这样的一个过程,所以接下来我们就给大家看一看在代码里边到底怎么来实现这一部分内容。我们还是在代码里边新建当前的这个目录,下面新建一个object。这个同样我们就叫做,呃,这个是表聚合函数,所以是table aggregate function test表均合函数,面函数先写出来,然后接下来里边的内容前面就完全一样对吧?这个我们都不用细想了啊,这里边肯定是一模一样的一个过程,那我就直接把这个直接copy到34TABLE创建出来对吧?数据读进来,然后把这个34TABLE创建出来,同样该引入的包引入,我们把那个影视转换也要记得引入,大家就养成好习惯,每次进来之后先把这个先先引入就完事了,然后接下来呢,就是要定义自定义的那个表聚合函数了,对吧?哎,自定义表聚合函数,然后我们想要实现一个什么需求呢?呃,大家发现我们说最好的这个方式就是实现top n嘛,那我们现在是sensor的这个数据,那我们就提取,诶TOP2的。
06:55
呃,两个温度值好了,对吧?来我们提取所有呃温度值中最高的两个温度啊,那当然了,我们现在这个需求也应该是基于每个sens,每个传感器去做的一个聚合,对吧?你要是把所有的传感器混在一起,这个温度值去最高,这个就呃,不是我们现在要的这个,这个没没有我们现在要的这个需求的含义了,对吧?我们要的就是对于每一个传感器去做这样的一个聚合,那接下来我们就直接定义class啊,但是大家想你你如果要是想直接去就是基于所有的这个数据去做聚合也可以,对吧?你就像我们这里边举的这个例子,它就是相当于基于所有的数据嘛,没有针对ID去做,去做group by,对吧?啊,我们现在是针对每一个S的话,就相当于还需要有一个group by了,好,那这里边我们定义的是一个。
07:55
名字叫做啊,Top to ten对吧?然后extend当前应该是一个tableable aggregate,这个就不会错了啊,因为我们之前的那个通用的flink,通用的那些函数里边肯定没这个东西,对吧?只有这么table API里边有这样一个函数啊,那里边我们看一下它需要定义哪些函数类型呢?还是输出的数据类型对吧?啊,就是result的这个数据类型,注意这个数据类型呢,就可以有多行输出对吧?可以输出多个,然后还有就是当前保持的这个accumul状态的一个类型啊,那这里面我们的输出本身应该是一个,呃,大家想到当前的这个输出的状态应该是有一个温,呃,就是当前最高的一个温度值,对吧?另外呢,我们可能还应该输出一个,呃,应该输出一就是呃,当前这个温度值到底排第几对吧,尽管我们只有两个。
08:55
你一目了然就知道,呃,前面我输出一个高的啊,第一的,后面输出一个第二的对吧?呃,但是我们这里面最好还是加一个这个rank啊,所以这里边我们输出的给大家写一下,输出一个二元组,就是当前的这个temp,以及就是最高或者第二高的temp,以及当前的rank,当前的这个排名情况,所以这里边应该是一个double和int构成的二元组,另外呢,还需要有一个当前的状态类型,状态类型之前大家也发现了,我最好是另外定义一个类去保存成这样的一个变量,方便后边去做更改,对吧?呃,因为你看这里边我们要定义的这一个就中间accumulate这个方法啊,它并没有返回值对吧?并不能你直接更改了之后,把这个状态返回就完了,直接更改这个状态,而是哎,我们这里边你得直接把这个状态去做操作,去做计算,那你怎么样能把这个当前的这个参数。
09:55
直接更改掉呢?那你传一个元组类型是不行的,你传一个类的对象,这个是可以的,相当于是一个引用对吧?传一个引用类型进来,里边的属性我们是可以可以更改的,所以这里边里边的这个实线是差不多的啊,所以这里边我们还是定义一个呃类,用来表示表距和函数的状态对吧,中间保存的状态,那同样这里边我们就定义一个top To Camp ACC这样一个状态里边的,呃,它需要的这个属性呢,那主要就是两个温度值了,对吧,一个是highest temp当前最高的温度值,一个double类型啊,那首先我们给一个就double的最小值吧,对吧,0VALUE给一个最小值,因为大家想后边我们是要做那个对比,做一个更新的嘛,所以一开始给一个最。
10:55
值,这就类似于我们给一个呃,或者像那个呃,给一个绝对零度一样,对吧,就是永远取不到的一个一个温度值,而且因为最后我们要的是啊,就当前要的是一个最大的最高的前两个温度值,所以说只要比这个高我们就会更新它啊,所以这个选取是没有问题的啊,然后我们再定义一个second,第二高second high east temp,同样它也是一个double类型,同样一开始初始值也把它定义成double类型的最小值一个负数,对吧?啊,就先放到这这里边我们要的这个类型就变成了呃,Top two temp ACC对吧?同样里边必须要实现的方法,诶大家看这里边就只有一个create accumulator了,哎,那大家会想到,那对应的我们不是说还有一个那个发射数据的那个方法,就是那个emit value吗?对吧,另外还有一个什么呢,还有一。
11:55
个accumulate对吧,这两个方法呢,名字就是叫这两个名字,这两个不能变啊,就是一个叫做大家看就是这里边accumulate,一个叫做emit value对吧,这两个不能变,但是呢,这两个方法在这里边就是底层的这个接口和抽象类里边啊,没有直接给我们声明定义出来,所以这里边没有办法直接重启,必须自己去声明,而且这里边的这个函数的名称还不能变,必须得是这样啊,所以这个就是大家知道这个用法就可以啊,那这里边我们看一下当前的这个create cuumulator的时候,我们当然就是说直接去还是直接去创建一个,因为要返回这样一个类型吧,直接new一个,创建一个top two,呃,Temp ACC就可以了,然后接下来呢,那就是自己去实现,呃,这里边就是实现计算呃,聚合结果的函数。
12:55
To accumulateate,对吧?啊,所以这里边def accumulate,好,把这个先定义出来,然后里边同样还是第一个是accumulator啊,我就直接简写了ACC对吧,大家知道它的类型是一个top to time ACC,然后另外呢,还有一个就是你当前聚合的时候,传入的参数是是是什么,我们还是每一行,呃,不需要传别的,我只要把当前的温度传进来就可以了,对吧?呃,就是像ID什么的,我最后你提取要输出的话,我可以在s select嘛,这个是没问题的,所以这里边我直接就把temp传进来,这是一个double类型,好,那同样这里边它不需要有任何的输出,对吧,类型不需要有任何的输出,要做的改变呢,直接在这个状态上做改变就完事了啊,那这里边我们就要需要做一些判断了,那是不是要判断当前的,呃,就是温度值是。
13:55
比状态里面保存的大对吧?我们判断当当前温度值是否比状态中的值大,那这个我们判断呢,就一层一层去做衣服判断了,首先判断的就是当前这个temp是否大于highest temp对吧?如果要比highest temp就最高的那个温度值都大的话,哎,那大家想是不是就是依次往后顺移位呀,对吧?哎,就之前的那个最高温度值变成第二高,然后第二高呢,就移出去了嘛,就没了对吧?然后当前我们这个是最高对吧?就是如果比最高温度高,那么就排在第一对吧,排在第一,呃,原来的第一顺道顺道第二位。
14:55
这就是我们这个要做的这个具体操作啊,所以是的,首先我们当然是要更新那个second对吧,因为如果说你先更新了highest的话,那后面highest就没保存了呀,所以先更新second second我们已经不要了嘛,直接用之前的highest来替代,而现在的这个highest呢,更新成当前的碳就完事了,做一个状态更新对吧,类似于状态更新的过程,Else if,如果说它比最高的温度小,但是比第二高的这个温度要高的话,也就是说在最高和第二高之间,对吧,那怎么办呢?哎,那我们说如果如果在最高和第二高之间,哎,那么那么直接替换就是最高的不么?
15:55
直接替换第二高温度就可以了,对吧?啊,那第二高就就没用了,所以我们直接ACC secondary,直接等于看这样的话我们的状态更新就结束,当然还有else else4如果要比第二高的还还低呢,还低,那当然就没用嘛,对吧,对我们的状态没影响,直接把它什么都不做就完事了。好,那最后我们还需要实现一个实现一个输出结果的方法,就是最终处理完表中所有数据的时候,所有数据时调用,哎,所以接下来这个名字呢,还不能变,必须是1IT value就叫这个对吧?啊,这中间这个大写为大写,这是底层哈扣给我们写死的啊,那这里边传入。
16:55
什么什什么参数呢?这这里大家也要注意一下,前面是当前的状态,因为我当前能拿到的就是状态,对吧,Top two temp ACC,然后后边呢,哎,注意要有一个collector啊,这就是我们说的最终我要想去实现的东西呢,就是用这个啊,就当前的这个collector去实现的,注意这里边选哪个呢?阿帕奇flink YouTube对吧?Flink里边给我们实现的这个collect类型跟我们之前在那个Fla map,或者说process啊,那个function里边,对吧?大家做这个数据输出的时候,不是可以输出多条吗?哎,都是用的这个collect去去做的输出,这里边也一样,还把这个作为参数放在这儿了,这就比我们那个cable function里边底层直接调这个看起来好像就更明白一点,对吧?它这个是底层给我们已经包好了,就在这个本身类里边有一个自己的这个,呃,私私有不是那个继承的啊,就是protected的一个属性,一是一个collector,然后呢。
17:55
有一个公有的方法,直接调他的那个clap的方法,那这里边呢,啊,不太不太一样,把这个作为参数给我们直接传进来了啊,你得自己定义好对吧,因为没有办法去直接重写方法嘛,那这个collect的本身,它的这个里边这个collect的类型,那就是我们输出的数据类型对吧?这里边你是这个二元组,一个temp,一个rank,这里边也是这个二元组类型,对吧?把这个写好,然后后边还需要就是这里边的这个输出呢,没有任何的没有任何的输出的结果类型unit,对吧?直接把这个写好,你看这个一写好之后,上面这个表函数,呃,表聚合函数本身就已经不报错了,对吧?该实现的都实现了,类型都是匹配的,好,那接下来这里边我们看一下怎么去做呢?呃,很简单,你要是输输出的时候,我直接包装成二元组,把我状态里边的东西拿出来输出不就完事了吗?Highest,这是排第一对吧,然后out.collect大家看这种。
18:55
奥点用collector去输出的时候,方便的就是你随时对吧,想输出什么输出什么就完事了,不需要做一个返回值的那个限定,哎,那这里边第二高温rank是二,对吧,直接输出这两条,这样就完成了我们的这个需求,然后接下来我们去看一看代码里边怎么样去调用啊调用的过程其实还是类似,首先我们是table API的调用方式,首先呃,那我应该要去创建一个这个表具和函数的实例,对吧,还是把它扭出来top to temp啊,然后这里边不需要有任何的参数,然后接下来这个调用的过程当中啊,得到这个result table还是基于sensor table,先去做一个group by,对吧,先去做分组,还是基于ID做分组,然后接下来怎么样呢?哎,之前是aggregate,现在是flat aggregate对吧,然后你看里边要传的就是一个就是一个table aggregate function了啊,当然也需要有一个表达式,要做一个agg。
19:55
指定我们输出之后那张表里边的字段名称啊,这里边还是一样啊,跟前面那个聚合函数非常的类似,字段是什么?Temperature嘛,温度值对吧?As我得到的是一个二元组,所以说这里边两个字段,当前的temp以及当前的rank对吧?把它包装好,最后再s select出来就完事了,当前的ID,当前的temp,当前的rank做一个输出对吧?啊,这就是当前我们做这个table API的一个调用方式啊,表聚合函数跟之前的那个聚合函数基本上是一样的,只不过就是这里面有多条对吧?对于一个ID,可能我同时聚合完了之后得到的是两条数据的输出。好,那接下来我们再看一下,呃,CQ里边怎么实现CQ里边的实现呢?现在这个直接用这个表聚合函数实现的方式还不是特别的,呃,就是呃容易实现啊,就这里边我们就不给大家做。
20:55
具体的呃,实现的过程了,但是大家会想到在CQ里边类似的功能,我可以用调用具体的那些函数来实现,对吧?因为大家看这个在CQ里边其实是有这个,呃,就是比方说我们这个rank函数对吧,大家还记得之前我们说那个聚合函数里边CQ是实现的,比方说RA啊,比方说role number啊这样的一些调用的,而如果是这个table API呢,没有这些函数,所以说我们才要用表聚合函数去做类似于top n这样的一个操作,对吧?啊,那CQL那边这里边我们就不去详细再做一个别的这个讲解了,就现在CQ里边不太好用啊,就没有办法直接把这个定义出来之后,直接在CQ里面去做一个转换操作,好,那我们把这个打印还是做一个打印输出table to upon stream,同样还是肉去调一下这个print方法把它打印,哎,这里边我们直接打印吧,只有它吧。
21:55
肉,这里边还是引入flink types点肉,最后不要忘记Env.execute,执行起来当前是一个table aggreate function test,这就是完整的流程。好,我们接下来来运行一下,测试一下,看这里面我们直接运行报错,诶,为什么呢?还是一个问题,因为我们做了表聚合,表聚合的这个过程呢,它也要更改,对不对?哎,应有当前每输入一条数据的时候,之前的top to就会做更改啊,哎,所以这里边直接to aend是不行的,我们要totract street,这样的话,哎,就可以直接运行了,我们再重新运行一遍。
22:42
接下来我们看一下这个得到的结果到底是怎么去做的啊,大家看到这里边我们说的结果是什么呢?来了一个三四十一的数据啊,35.8对吧?这里边呢,诶,排名第一的就是它排名第二的,这里边我没有做做筛选对吧?你看排名第二的输出的是一个很大的一个负数对吧,这还是用那个科科学表述科科学计数法的那个E这种方式去去表示的,然后这是排名第二对吧,因为没有没有排名第二的那个,呃,温度值嘛,是一个double类型的最小值,然后同样六七十都是这样的一个输出,然后sensor一来第二条数数据的时候,你看接下来,呃,大家看首先是一个false,对吧,我把之前的第一第二你看全部都false掉了,为什么呢?来了第二条数据是一个更大的数据,是这里边,诶不是啊,这里边是一个更小的数,就是比这个35.8小三12,但是呢,这里边你看到我们。
23:42
经这个表结果的时候,大家看到它不是只更新我们之前的,这就是假如说我判断那个第一个,呃,这个没变的话,我就不更新了,因为我们当时是干干什么事呢,每来一条数据,这里边大家想它底层原理啊,是都做了这个输出的对不对?你既然是都做了输出,那是不是就意味着我之前的输出都要撤回,要重新用现在最新的呀,对吧?呃,除非就是说这里边我做一个判断啊,你这里边可以做一个做一个优化,就是说我判断如果当前的这个highest并没有,呃改变之前的那个那个状态的话,那这里边我就不要输出这个high,对吧?啊这这个你做这样的调整这个判断也是可以的啊,啊,但是这种可能实现起来就会非常的麻烦,你还得去记住之前的那个,呃,就是最大最小值到底是什么,对吧,你不能说是这里我直接调用这个,呃,直接就输出了,这个还有点麻烦啊,所以这里边你看到我们。
24:42
们是直接两条false,把之前的第一第二全部撤回,然后诶更新我们当前最新的第一第二三十五点八三十二对吧?啊,然后后边再来了一个36.2的时候呢,撤回之前的两条,然后接下来36.2 35.8对吧?啊接下来你看即使即使是啊,我们的这种操作就是什么呢?即使是没有更新最大最就是第二对最高温度和第二高温度值,即使是没有改变,我这里边呢,也会有一次撤回和更新的操作,对吧,每次来了之后都会有这样的一个操作,因为我这里边每次做的计算结果都要有一次输出嘛,对吧,每来一条数据之后都要有一次输出,所以得到的结果就是这样的一个状态,大家可以下来之后拿这个再做一个测试啊,当然大家可以发现这种方式可能还是有点麻烦的,对吧,一个就是这里边你要做优化的话,呃,这个过程可能稍微有点。
25:42
麻烦,另外还有一个就是我这里边是一个,呃,就是这里边做的这个判断是要取这个TOP2,那如果说我这里TOP3呢,TOP5呢,TOP10呢,那这里大家可能就会发现这个就有点麻烦了,你如果定义这么多状态,然后挨个if去比较的话,这个看起来就很麻烦,对不对啊,其实更好的一种方式是什么呢?其实大家能想到,其实我可以把之前的这个温度是不是可以,哎,就是都给它保存下来对吧?哎,我就只保存十个温度值,然后接下来怎么样呢?来了一个新的温度值之后,我跟他去做一个排序对吧?插入排序可以直接插入到里边去就可以了,最后我按照顺序把这十个温度,比方说top ten对吧,TOP10直接做一个输出就可以了,比这种一个一个if的这种判断就会稍微的好一点啊,当然了就是说你如果要。
26:42
说这个插入排序里边的这个原理的话,哎,那可能也类似于是一个E对吧,不停的去做比较,或者你做一个二分查找这样的一个具体的实现,那这其实就是一个具体算法的优化了啊,那至于说我们后边这个每来一条数据之后就要输出一次,对吧,即使是没有调整也要输出一次,这个可能就稍微有点麻烦,这个就你就相当于我还需要在状态里边,类似于我还得去保存一个,呃,就是当前这个,呃,当当前这个是否要输出的一个状态,对吧?啊,这个就涉及到的东西就更麻烦更多了啊,所以具体在操作的过程当中,这个top喷N这个需求呢,可能我们往往不会直接用自定义的表聚和函数去做操作啊,那怎么去做操作呢?Top n需求可以直接用CQL里边已经实现了的那个函数去做,或者呢,可以用啊,就是我们的这个data streamam API,对吧,我们直接自定义这个data stream API。
27:42
改编的这个process process function直接可以把这个需求搞定,所以后边我们在项目当中给大家介绍另外的两种实现套的方法,好,这一部分我们就先讲到这里。
我来说两句