00:00
再来再给大家讲一下自定义的聚合函数啊,聚合函数就是所谓的aggregate function了,这个函数大家并不陌生,之前我们在讲到data streamam API的时候啊,包括窗口的聚合,包括前面这个K之后直接做的聚合,那这里边呃,所有的这个sum max mean,或者说自定义的reduce,包括window API里边可以定义的这个aggregate,那那其实都是一个。聚合的操作啊,里边我们真正如果要是传一个函数类的话,里边我们传的那个都是一个聚合函数。呃,就相当于reduce function啊呃,包括已经弃用的forld function,还有aggregate function,这些本身就是data stream啊流上面的聚合函数,那我们这里定义的聚合函数呢,又稍微不一样一点,它是表上面的聚合函数对吧?啊,大家可以结合起来看啊,但是一定要区分开这个名称,这个类名是一样的啊,都叫aggregate function,所以聚合函数就需要继承这个aggregate function这个抽象类来进行实现啊,大家大家引入这个类的时候一定要注意,我们之前那个window里面不也有一个aggregate function吗?啊,千万不要引错了啊,这里边得是就是table API下边的aggregate function。
01:13
那有时候我们会把这个就是用户自定义聚合函数啊,有时候简写叫做这个,呃,UDH,对吧,就是user DeFine,呃,Aggregate functions,那么它在使用的过程当中其实还是非常普遍的,而且也比较简单,这个是需要大家重点掌握,那它的特点其实是什么?它就是可以把一个表里边的数据。哎,所有的数据聚合成一个标量值对吧?啊,所有的数据聚合在一起,所以如果对比前面的这个,呃,我们的这个标量函数和表函数而言,那这里边聚合函数它相当于呢,这个输出还是一个标量值,跟我们那个标量函数有点像,但是它的输入不一样。
02:01
就是之前我们的那个,呃,标量函数和表函数,它的输入其实都是当前一行对吧,一行数据里边我们提取某个字段或者某几个字段啊,然后给他做一堆这个转换操作得到,哎,可以是一个标量值,也可以是得到多行数据,得到一个表,这是前面这个scale function和table function的特点。而现在聚合函数呢,输入不再是一行数据了,而是整张表的数据,对吧?整张表的数据进来,我们调用这个聚合函数,得到一个聚合成的一个结果值,标量值。这就是所谓的aggregate function。啊,大家对它其实也并不陌生,我们看一下这个例子,一看就能看出来啊,比方说当前我们有这样一张表啊,这个表里面的字段呢,一个ID,一个name,一个price,那我们知道这肯定就是跟商品有关,有有价格嘛,对吧?这里面的商品是什么呢?其实就是各种各样的饮料,大家看有这个什么拿铁啊,当然还有还有这个牛奶,对吧,还有什么这个布雷味啊,摩卡,还有茶,呃,每一个饮品都有一个ID,对应着有一个价格。
03:10
然后我们现在要干什么呢?这非常简单,Select max,对吧?这张表里边最大价格的那个,呃,那个值对吧,Max price from,呃,这个比方说我管这个表叫注册成my table,就这么一行CQ,我要实现这样一个自定义的。Max这样一个函数,它是什么呢?它就是一个聚合函数。啊,所以大家看这个非常简单是吧?啊,在这个过程当中,其实过程可能还不是那么简单啊,就是在这里边我们得定义什么呢?这个aggregate function,其实整体来讲是比较。就看起来更接近于我们比较熟悉的那种聚合的操作啊,就是它里边需要定义一个累加器。Accumulator,之前我们讲到window API里边的那个aggregate function不也是它有状态嘛,对吧?那中间它的状态是什么呢?就是我们定义的那个accumulator吗?这里边类似啊,这个累加器,大家可以认为就是相当于我们在做这个表操作的时候的这个中间状态啊,那这个呃,Accumulator大家看啊,就是在这个。
04:17
生命周期里边,首先我得create。然后还得怎么样呢?还得有一个get value,就是说我最后又要返回这个结果的时候,调用哪个方法对吧?这是初始化,这是最后返回结果,然后中间计算,每来一个数的时候怎么计算呢?哎,当然是调一个accumulate这样的一个方法,所以这个过程其实和跟那个window API里边的那个I function真的是非常非常类似,对吧?啊,只不过当时我们还多了一个墨,而且我们说那个墨一般用不到嘛,只有在窗口session里边,Session window里边才能用到,那一般情况平常就是什么呢?那就是一个create accumulator,然后我们后面是有一个ADD的方法,现在是变成了accumulator,对吧,每来一个元素调一下这个方法。
05:03
最后我们有一个输出啊,Get之前是get result,现在是get value,只是变了一个名而已嘛,所以实现其实还是呃,非常类似的啊,就是中间核心是这个累加器状态accumulator。啊,这里边我们展开再说一下,就是必须要实现的方法有这么几个啊,就这一个是创建一个累加器,这是我们保存中间计算聚合结果的那个那个类型对吧?那那个状态啊,就类似于然后accumulate accumulate呢,就相当于之前我们window API里边讲过的那个,呃,就是aggregate function里边的那个ADD方法,每来一个叠加一次啊,大家回忆一下啊,之前我们讲过那个啊,它没有叫A,它叫accumulate啊,这就是呃,上层上层实现它家这个定义的不一样啊,所以现在就用这个了,然后最后还有一个是获取最后的值get value对吧,最后聚合的结果到底是什么,那要用这个方法来得到。
06:01
啊,那在这个过程当中还有一些就是还有一些可选的方法啊,比方说还有这个墨对吧,之前我们那个方式不也有墨吗?啊,这里边也有对吧?另外还有一个叫reset accumulator,就是说重置状态,把当前的状态那个呃,回到那个默认初始值对吧?或者说这个直接清空回到初始值啊这就是这个reset也是可选的,你可以不实现。那另外还有一个方法叫tract,就是说如果说啊,我们这里边这个就是定义了这个over窗口的聚合的话,说不准会用到一个,呃,撤回的一个方法,对吧,Retra的这样的一个操作。啊,这是这个,呃,给大家大概的说一下啊,我们具体实现的时候,其实只要实现这三个方法就可以了。啊,那接下来我们还是在代码里边做一个具体的操作,看一看,呃,到底应该怎么怎么去定义这些东西啊,新建一个object。
07:01
还是这个,我们就叫做聚合函数aggregate。Function test。好,然后接下来,呃,前面的这个内容,那还是照抄对吧,环境该定义定义出来读取文件,后面转map成样例类,先读成流,然后呢,再转换成表,接下来我们的操作都是基于转换过来的这张表要去做的啊,那前面还是把那个。这个影视转换先引入啊,下划线引入。好,然后接下来,哎,我们就得自定义,同样是吧,自定义一个。一个聚合函数,哎,那我们现在这个呃,就是sensor的这个例子里边我们聚合什么呢?那我们干脆实现一个average吧,对吧?前面我们不是要求取呃所有这个温度值的那个平均温度吗?现在我们就做一个这个呃类类似于这个average的操作啊,那这里边的这个average如果要想把它定义出来的话。
08:08
大家会想到就是这,我们先把这个写出来啊,Class,然后avg extend。Aggregate function。啊,里边应该是有类型的,对吧,我们先看一眼啊,这里边要引入,大家不要引错了,下面这两个,一个common flink API common,这是我们window里边定义的那个aggregate function,对吧,那就是。Data streamam API里边底层的这个通用的这个aggregate function,而我们现在要用的是flink table functions下边的aggregate function,然后我们看一眼这里边它的类型是什么啊。哎,之前我们的那个averageggg function是是input ACC,然后out对吧,In ACC out就是输入的类型,输出的类型和这个呃,累加的类型,那这里边大家看它少了一个类型,这里边它就是T和ACC。
09:02
所以当前我定义的时候是什么呢?哎,就是只有这个结果的类型,没有那个输入的类型了,这个大家也很好理解,为什么呢?啊,因为我要定义输入的话,按之前我们这个,呃,这个定义的这个过程,它输入参数都哪哪里来的呢,都是在自己里边这个方法里边定义出来的,对吧?那所以它并不是在外部就直接给我们包装好的,那当前的这个aggg的方式也不例外。所以现在我们这个输出既然是求均值嘛,我就直接求这个平均温度值,那就是一个double。Double类型,然后还有一个这个状态。大家可能会想到,诶,那你现在的这个状态,那就应该是二元组嘛,我既要求当前温度所有温度值的这个和总和sum值,又要有一个计数值,对吧,来一个count值啊,那这个我们先这么定义出来啊,大家看现在它报错有必须要override的方法。啊,那我们先看一下啊,必须要实现的,诶大家看必须要实现的是两个方法。
10:04
一个create accumulator,一个get value,所以这就是初始化创建这个状态,以及最后获取到这个值,这两个是已经定义好的。啊,然后接下来,所以说这个就是这两个我们可以override,那这个就是你直接按照这个类型写就可以了,对吧?啊,但你这个定义出来之后,只要返回对应的这个状态,它自动就给我们创建出来了,但是中间过程当中,我们如果要是来一个数据去转换,去计算的那个过程怎么办呢?啊,这又是这个,呃,就是算是一个历史遗留问题是吧?啊,还是跟之前一样,你必须去自己写一个它定义好的必须实现的方法啊,就这个不是直接可以override的是是你必须在后边定义出来,定义什么呢?就是前面我们说到那个accumulate。这个名字必须叫这个啊,因为下面大家看,如果感兴趣的同学可以追到源码里看一看,它源码里边就是定死了,他判断那个函数名称就得叫这个名儿啊,所以那没办法对吧,我们必须叫这名accumulate,然后里边的参数它也定死了,必须是前面一个是当前的。
11:14
呃,就是状态类型对吧,就是accumul,或者我直接就叫这个ACC吧,大家知道就是accumulator对吧?那这里边状态类型当然就是double,然后int就是这个类型,然后后边呢,是我们当前输入的每一个值,就是最新我们这个流式处理嘛,所以是来一个数立,呃来一个数据叠加一次,来一个数据处理一次,聚合一次,那所以我们最新的那个值,那就是temp温度值double。啊,就是我应用这个的话,大家就想到我只传一个那个字段,就是那个temperature那个字段就可以了,别的不用传对吧?啊,但是这里面还有一个很尴尬的问题,就是说对于这个方法而言,它外部定义的时候,它的参数必须是前面是类型,后边是呃,这个当前新输入的数据,而且调用的时候必须调它这个方法,而且这个方法。
12:09
没有返回值啊,这个就很尴尬,为什么呢?因为之前我们在那个window的API里边aggregation aggregate function,我们要做的操作很少,直接定义好就可以了,那这里边直接返回它那个at方法,返回的是什么呢?返回的是就是每来一个元素,我就要返回一个叠加之后当前状态的更改的那个值。啊,所以这样的话,我不需要单独再去对这个状态做操作了。那现在他没有返回值,所以那个状态还得我们自己去改啊,自己去操作。可是当前我如果是一个二元组的话,大家发现啊,呃,这个。就如果说这里传的是一个二元组类型的话,那我这个参数传进来,这不就是一个常量嘛,对吧?啊,所以本身这里边如果我去更改这个值的话,这其实是不能去做这个更改的。
13:03
那如果说我们想做这个这个状态的更新,状态的变化,那其实还需要在外边把它定义成一个类,对吧,相当于这里边我们传一个我们拗出来实例的那个引用,那么在这里对它做了更改,到时候就可以把状态就是返回去表达出来了。啊,所以这个过程就是还涉及到这个比较麻烦的事情啊,啊,那在这儿我们就先。就以后这些用法可能会调整,可能不会那么麻烦了啊,现在确实是,呃,需要注意的地方还挺多,这里边我们专门定义一个。定义一个,呃,这个聚合函数的。状态类啊,也就是accumulator对吧。用于保存聚合状态,那大家知道这个聚合状态其实我要的就是一个温度的sum值,以及当前的一个count值啊,所以这里边我定义一个class。
14:10
当前我这个就叫avg,我的这个平均温度的状态accumulator ACC。里边其实也简单啊,两个这个变量啊,必须要改嘛,对吧,两个变量一个sum sum是个double型。它的初始值当然是0.0对吧,然后再定义一个count,它是一个int类型。初始值就是零,哎,这样把它定义好,然后下边我们这里边的类型也就不一样了,对吧,这里边类型完全不一样了,所以我把这两个直接先删掉啊,这里边我们要把它改成当前的这个类的类型,自己定义的这个avg temp ACC类型。所以下边如果我们。呃,如果我们复写这两个方法的话啊,应该是它对应的类型就变成了我们这个类了,所以初始化创建的时候,你就不要直接给那个二元组了,对吧,直接new一个average。
15:06
碳ACC扭出来就完事了。啊,后边我们再再获取这个结果的时候,那怎么办呢?这个其实也简单,这不就是我们前面那个sum除以那个对应的那个count就完事了吗?对吧,第一个其实这里面有字段啊,Sum除以accumulator.count1除完事。啊,那对应的这里边我们这个操作accumul,这个accumulator啊,ACC状态也得改avg tab ACC,那这里边我不能返回值去更改这个状态,但是现在它既然是一个类,我这里边传进来的是一个类的实例了,那当然我直接更改里边的值不就完了吗?对吧,当时我改给的定义也就是一个Y一个变量嘛,所以现在我直接改。所以这里边就是acc.sum加等于呃,当前这个碳对吧,把当前最新的这个温度加上,然后另外还有一个是acc.count。
16:07
加其实就是加一嘛,对吧,加等于一直接把这个叠加上就完事了。啊,这就是这个处理的过程啊,啊,所以大家会看到这个,呃,现在的这个不是特别完善,所以说你要自定义做很多操作的时候,需要考虑的细节点比较多啊,我们想要实现这么一个一个这个平均温度的一个聚合的话,你类型还得做这样的一个调整,大家可以下来之后对比我们之前在呃,Data streamam API啊,Window操作里边定义的那个自定义的aggregate方式,看看他们的这个实现有什么差别啊,其实是比较类似,但是又不太一样。已经实现这个了,可以接下来可以在这个代码里边调用了,对吧?呃,但大家还记得这个聚合,聚合的这个方法应该在什么地方调用吗。啊,首先我们还是先。创建一个呃,聚合函数的。
17:03
实呃实力对吧。一个对象啊,我们先把它定义出来,Avg temp就叫这个名儿,New一个。Avg对吧,先把它创建出来,然后接下来这个是table API调用。同样我们这里边直接定义一个result table。Sensor table,诶,大家还记得那个在哪里,跟这个聚合有关吗?其实就是之前刚刚我们讲过的啊,在这个window操作里边,我们当时做这个窗口,呃,调用的时候,其实是有一步操作,我们先是做了一个。就是group窗口啊,我们这里边其实是window之后做了一个group by对吧。啊,所以group by之后得到的这个是一个叫做window group的table这样一个东西,然后在这个接口里面,大家看到它能调什么方法呢?哎,能调aggregate方法,能调flat aggregate方法对吧?哎,这里边aggregate大家想到既然是聚合嘛,哎,那这里边它的这个expression是不是就相当于是我们自定义的一个aggregate方法呀?呃,因为大家想之前我们那个window操作的时候,不也是点aggregate里边就传一个aggate function吗?我们现在其实也是要点aggregate就可以传一个这个aggre function对吧?
18:28
啊,当然这里边它本身这是做了开窗之后的,这是window group的table,然后呃,可以调这个方法,那大家可能会想到我不开窗直接分组,直接group by可以不可以呢?啊,当然其实也是可以的。我先把这个先写出来啊。当前要分组当然是基于这个ID字段了啊,那这个group by之后得到的是一个group table,不是window group table了啊,没有window group table,那么它这个接口里边同样我们也看到了,也是select aggregate flat,对吧?啊,Select这里边你就是选取字段,然后里边的表达式是系统,就是我们整个table API已经提供出来的那个可调用的聚合函数方法啊,那这里边如果要是自定义的话,当然就应该是aggregate了,对吧?啊,所以这里边其实是直接分组啊。
19:22
就可以。接下来分组之后直接调aggregate啊,那aggregate里边传的当然就是我们定义好的avg temp里边要传的字段temperature。啊,所以它的含义就是每个对于每一行哎,都要调这个聚合函数啊,它是把每一行的temperature提取出来,然后应用我们的这个aggregate function里边的accumulate方法,最后我get这个value,得到一个结果值,啊,就是我们当前这个最后聚合出来的结果,对吧?啊,那这里边聚合出来的结果我可以给它叫个名是吧。
20:04
这个S。Average temp,呃,给他叫个名,重命名,然后接下来select出来对吧,当前的ID,呃,当前的avg temp。输出啊,这就是我们整个的一个过程。呃,那下边我们可以把它做一个打印输出。Result。Table。To aend啊,大家知道在这个过程当中可能就不能to aend了,为什么呢?啊,我们没有开窗,所以是在之前基础上在不停的改这张表对吧?所以这个过程你如果突了判的话,那肯定就报这个结果不对了,所以这里边我们要突tract string。好,然后这里边呃,里边的类型,我们直接可以给一个肉啊。定义出来,或者是大家想给那个具体的类型,给个二元组也行,这不就是一个string,然后后面一个,呃,对应的那个double类型嘛,啊,所以这个其实怎么做都可以啊,后边print,这是result。
21:06
好,我们先来看看这个结果怎么样啊,先运行一下。好,这里边错了啊,大家看这个这个运行肯定没结果,因为我们下面还少了一步操作env execute对吧,这一步操作少了啊。呃,所以这个完了之后,我们先直接停掉吧,他输出肯定也不会有正常输出的啊,所以这里边我们要的是一个就叫AJ。Udf test job。好,重新来启动一下。好,大家现在看一下这个运行结果,这里边我们得到的运行结果就是聚合的结果,对吧?求平均温度的结果,哎,那这里边大家看到这个六七十都只有一次输出,为什么只有一个值嘛,只有一个值那当然就是自己了,而且前面都是处对吧?前面我们输入四条数据的时候,每一个数据都只有自己的这条数据,所以都是true,然后我们关键看这个三一后边它这个数据。
22:06
不停更新的时候,它发生了什么事情,大家看第二条数据,37.9来的时候,它输出什么了呢?诶,Retract stream输出了一条false,对吧,之前这个35.8作废了,现在处处变成什么了呢?啊,35.8和37.9的一个平均数,大概就是啊,36.85的一个样子,对吧?啊大大概是这样一个值,然后接下来又来了一个32.4,哎,这个温度来了一个比较小的值,那之前这个36点,呃,八五这个值也就作废了,False了。现在平均数三个数一平均,因为来了一个32点多嘛,拉低了35.367,对吧,大概是这样一个平均数,那后边再来一个39呢,哎,之前这个又作废了,又是false了,现在又拉高了一点,变成了36.275。这就是我们这个自定义聚合函数去算它当前的这个平均数的一个过程。
23:02
大家可以下来之后把这个好好测一测啊。
我来说两句