00:00
接来我们再来讲另外一个非常重要的u udf函数,另外一个类型,这个类型呢,就叫做聚合函数aggregate functions啊,其实这个概念大家并不陌生,对吧?聚合函数嘛,我们之前做这个sum,呃,做这个aggregate,做做这个reduce,这些不都是在做聚合吗?都是要把很多数据,然后最后相当于我们做规约,做整合,对吧?最后统计出,统计出来一个结果,这不就叫聚合函数吗?所以现在我们在table API里边调用的这个自定义的聚合函数也是一样的含义,哎,这里边有时候会简写,把这个叫做呃,UD,对吧,就是user DeFined aggregate functions,那么它主要就是把一个表里边的所有数据,多行数据聚合成一个标量值。所以如果说前面的这个scale function啊,标量函数是一对一,就是一行数据来了之后得到一个结果,而后面的这个表函数table function是一个一对多一行数据,要把它哎直接打散了铺开,变成多行数据,变成一张表,那现在这个聚合函数呢,那就是多对一对吧,就是一张表里边的多行数据,最后我聚合成一个结果啊,那我们这里边要定义的这个聚合函数呢,同样也是要继承一个抽象类啊,实现它里边的这个方法,那这里边我们要继承抽象类就叫做A的function式啊,这里大家要区分一下,之前我们讲到window API的时候,大家记得在这个弗link底层啊,Common functions里边也有一个aggregate方式,对吧?啊,因为我们说这个window API里边可以直接调那个aggre方法吗?增量聚合函数里边不是有这个reduce function和aggregate function都属于增量聚合函数吗?哎,这里面的这个aggregategate方式。
01:53
是不是跟那个一样呢?注意不是一回事,之前那个是flink common,就是底层通用的那个聚合函数,而这里边的这个聚合函数呢,它是一个就是table API里边给我们单独定义出来的这样的一个聚合函数,所以大家引包的时候千万不要引错了啊,这里边还是有一些区分的啊,那这里我们给大家举一个例子,大家可以看一下啊,主要要做什么样的操作呢?这里面有一张表,这里边有ID,然后有内,这里边大家看到这个什么。
02:24
啊,Lucky啊,马铁是吧,还有milk,还有什么布雷味摩卡,这些就都是咖啡或者茶,或者牛奶一些饮料啊后面呢,这是它的单价price,所以接下来我们可以做一个什么操作呢?我想写这么一条,CQ select next price,也就是说我要找出这张表里边price单价最高的,哎,那个数据,那个饮料,那所以说对于我们当前这个需求而言,其实就是你输入多行数据,一张表的数据都输进来,最终输出一个结果就完事了,对吧?啊,直接输出一个值就搞定了,所以这就是一个典型的聚合函数可以搞定的事情,那这个聚合函数里边我们要定义哪些东西呢?呃,大家看到啊,我们实现的这个agggate方式里边要有这么几样东西,首先我需要有一个accumulator,因为大家想一想,我做这个聚合啊,就就大家类比我们之前讲的那个。
03:25
呃,Window里边的增量聚合,为什么它能实现增量聚合呢?是不是就是因为它内部自己保持了一个状态呀,对吧?你保持了一个状态之后,那就可以不需要把之前所有的数据都存下来了,我就来一个数据,在之前那个状态基础上做个聚合就完事了嘛,啊就像我们做sum或者说做这个count一样,那就来一个加一对吧,来一个那个计数器,类似于就是保持一个计数器就加一,或者你如果做求和sum计算的话,那就来一个数,诶我这个在之前那个和基础上叠加就完事了,所以这里边呢,我们有一个状态,这个状态叫什么呢?就叫做accumulator,一个累加器,对吧,一个一个类似于这样的一个状态,ACC啊,这是它的类型,叫做ACC对吧?啊,那一开始呢,我当然就得有一个创建初始状态的方法,这个状态这个方法叫做create accumulator啊这个方法里面我们就可以定义这个创态状态的初始值。大家还记得我们定。
04:25
第那个k state的时候,是可以在本身呃,在它那个呃获取状态句柄的时候啊,声明状态的时候,后面直接赋出值的是吧?啊这里边呢,这直接就是这个抽象类里边啊,你必须实现这个方法,把它显示的声明出来,我初始状态是什么啊,Create accul,然后呢,呃在在这个中间,那就是每来一条数据之后,我这个状态到底怎么去叠加,到底怎么去聚合呢?调这个方法accumulate计算对吧?大家看这里边的参数,就是你要知道当前我的这个呃状态accumulator是什么,作为一个参数传进来,然后呢,还有一些就是用户自定义的一些参数了啊,那个就是我们当前传进来的数据是什么是吧?每来一条就聚合一次,因为我们当前还应该是一个,呃,一个。
05:16
底层还是一个增量聚合的过程嘛,我只要定义出当前我要的那个字段,要那个参数就完事了,然后最后还有一个方法啊,叫做get value get value意思就是说你最后聚合完了之后啊,我到底要得到一个什么结果呢?啊,这里边我就是把状态传进来,最后你返回一个输出结果就完事了。对吧,比方说我这里面输入的,你看就是当前的这个63584对吧,就是每一个的这个价格输进来,然后aumul这个状态,这里呢,我就保持一个当前的最大值,一开始是六对吧,八进来之后就变成八了,保持这个最大值,然后最后输出什么呢?你不是要最大吗?那我就最后输出当前的这个状态,直接把这个状态输出完事了。
06:04
所以你当前这个select这一条CQ语句得到的就是我这个结果,调用get value得到的这个结果啊,这就是这个所谓聚合函数的一个处理流程,那这里边有几个要点要给大家说一下,就是这三个方法是必须要实现的啊,就是一个是创,创建当前的这个accumulator这个累加器状态,另外就是accumulate,就是计算对吧?啊,到底你怎么样去做操作,做计算,最后还有一个就是get value最后的结果怎么样去返回,那这里面呃,它的整体工作原理呢,就是首先先把累加器创建出来对吧?状态先有,然后接下来呢,呃,每一行调用都都调用这个accumul这个方法来更新这个累加器的结果,更新状态,那最后呢,处理完所有行之后,调用get value返回最终的结果对吧?啊,这就是一个完整的处理流程。
07:00
那接下来我们就在代码里边给大家来做一个具体的实现了,呃,还是在之前的这个c API test下边啊,去新建一个。新建一个object,呃,当前我们就呃这个这个还是在这个udf下面啊,这个还属于一个udf函数,直接去新建一个object,然后当前这个我们就叫做aggregate。Function test聚合函数的一个测试,然后接下来同样还是内函数内方法,先先写在这儿,然后前面的内容,那我们想就基本上都差不多对吧?哎,所以这个我就不详细写了,环境读进来,然后做一些基本的配置,绊环境基于这个流失处理环境创建出来后边读取数据,从文件直接读取数据,然后我们直接转换成样例类类型,而且是事件时间语义,所以我们直接分配watermark和呃呃,直接分配定义这个时间戳呃生成watermark对吧,后面呢,直接从这个流里边把它读取出来,转换成我们想要的这个sensor table1张表的这个类型。好,那接下来我们直接把这个引入啊,里边需要引入的这些包,我们还是把这个对应的影视转换做一个引入下划线,下面这个也做一个这个。
08:30
入啊,下划线的引入,好,那然后接下来那就是我们要做的这个操作跟之前也是非常类似的,是不是要就是定义一个聚合函数,然后呢?呃,在我们这里边创建一个它的实类table API里边就可以去调用了,对吧?啊那首先我们还是在下边去自定义,自定义一个聚合函数,哎,那么这样一个聚合函数我们到底要实现一个什么什么需求呢?哎,这里边这个非常简单的一个需求就是我们这样去,呃,当前我们是这个SEN4嘛,对吧,所有的这个温度值我们就直接去求每个S4啊,每个这个传感器它所有温度的平均值就完事了,对吧?但但是大家知道这个底层有这个聚合函数啊,直接ABG嘛,对吧,直接算就完了,我们这里边只是给大家举一个例子,看看怎么样去做这样的一个操作啊,啊,那我们就。
09:30
求每个传感器,传感器的平均温度值啊,所以呃,接下来我们给大家把这个类先做一个实现class啊,这个叫avg temp对吧?呃,自定义的这样一个聚合函数,然后extend,它需要去继承的啊,或者说实现的对吧,我们这个aggre方,然后这个aggre方大家在引入的时候要注意,这里边你选择的时候就要注意了啊,首先不是set,大家知道k set是底层的实现了CQ标准的,呃,那那个包对吧,就是阿帕奇的那个,呃,那个组件啊,那那我们这里边是CQ底层去调用的,这里肯定不是,然后呢,下面有两个选项,一个是一个接口link API common下边的functions,下边的这个aggregate function,这就是之前我们呃。
10:30
就是大家讲的这个window API里边我们要调这个GG增量和函数,对吧?就是这个ggate方式,那我们现在用的是什么呢?不是这个用的下面这个大家看这个take flink table functions下边的aggregate方式啊,这个大家要要小心一点啊,很容易引错包,然后里边呢?呃,大家看到这个function式里边需要有泛型,这个泛型什么意思呢?TCT大家比较熟悉,这就是当前我处理的这个table里边的数据类型啊,对吧?本身标准的这个类型啊,就是当前我们要诶注意这个其实就是最终我们得到那个结果类型,大家要注意不是输入数据的类型啊,因为现在我们做的是这个表的转换对吧?呃,你现在它其实本身我们定义table的时候,它每一行你作为肉直接去处理就可以了,他并不关心里边具体的这个数据类型,而我们这里边呢,关注的是得到的这个输出的数据类型要放在这里。
11:29
然后另外还有一个acccc,大家知道这就是accumulator,也就是我们说的中间聚合状态的那样的一个呃类型了,对吧?啊,所以这里边我们定义一下,既然是要求这个平均值嘛,平均温度嘛,啊,那当前我当然这个结果就是一个double类型了,对吧,平均温度,然后关于我们中间要保持的这个状态,大家可以想一下,这个状态我到底要怎么保持呢?诶我每来一个之后,我当前应该是怎么去保持状态才能最后算这个均数呢?有些同学说,诶你这个没法保持状态呀,对吧?你之前的那个平均数,下一个来了之后,你你不能直接把这个平均数跟他一加除以二啊,因为之前你那个个数不一样,这显然应该是有一个不同的权重的嘛。
12:15
哎,所以这里边我们要保存的其实不是一个简简单单的就是一个之前平均数这么一个概念,而是要保存什么呢?哎,我们应该是保存这个,这个状态是要保存两个值,其实是大家想想我保存什么,我就保存当前所有温度的和总和,以及当前所有温度的个数,保存这两个不就可以了吗?哎,这两个的话,每来一个新的值,你往上叠加是非常容易的,和的话就直接加上去,那如果是那个个数的话,直接加一就完了嘛,所以这里面大家想到我可以保存一个二元组,对吧,那就是这里面我们保存状态,保存状态是要保存什么呢?就是当前温度的这个我们写成这样啊,对吧,温度的和以及当前温度的个数啊,Temperature的对吧,把这些。
13:15
饱合在一起啊,那所以这里边我定一个二元组,前面这个sum的话,既然是温度求和,那还是double,然后后边这个count的话,哎,那我给个int,或者是给一个这个长整形浪都是可以的,对吧,直接把它定义出来,然后后边诶这个这个大家要注意啊,就是这里边我们里边大家看到这里面报错,为什么呢?当前的这一个抽象类,它是有必须要去重写这个RI的这个方法的,好大家看一下这里边这个抽象类里边必须实现的方法我们看到啊,诶这里边是不是有这个get value啊,之前给大家说过对吧,它是一个抽象方法,所以说这个东西你你你如果继承了之后必须得实现,对吧?啊get get value,然后另外还有什么呢?这里边有一些人他已经实现了的具体的方法了啊,这些我们就并并不关注了,然后啊,这里边你看到只有一个get value对吧?哎,所以这里边我可以诶大家看。
14:15
除了这个get value之外呢,另外还有一个user DeFine aggre aggregate function,因为当前我们的这个aggregate function啊,大家看到它本身还去继承了一个就是user DeFined aggregate function对吧,这个抽象类,这个抽象类里边呢,又有一个抽象方法叫create accumulator啊,就是这也是一个抽象方法,你必须是实现的,对吧?啊,所以这两个方法都是我们这里边可以去override直接实现的,一个是创建当前的呃,这个accumulator啊,那那大家知道创建的时候你怎么怎么创建呢?当然就是一开始都是零了,对吧,我一开始这个0.0和是0.0,然后初始的这个个数是零,这不就完了吗?然后我get value的时候,诶,这里边怎么样去get value呢?我直接用这个accumulator里边的下划线一,这是当前的和去除以当前的这个下划线二,去除以这个个数,得到一个double类型返回。
15:15
这不就完了吗?哎,这个非常简单对吧?哎,但关键的是这里边我还得去实现一个,大家注意啊,还要实现一个具体的处理计算函数,也就是必须大家注意啊,这个就跟我们前面那个EY,呃,EY6一样啊,呃一呃,EVEVL那个函数一样,就像之前标量函数和表函数里面那个一样,这里边的这一个函数名字呢,必须叫做accumulate,但是呢,它又不能直接overrightde,直接重写,没有在这个底层的继承的这个类里边去去声明,所以这里边它是在底层调用的时候,这个相当于hard code定死的这个函数名称啊,这里边还是有一点呃,不太舒服的地方对吧,但是没办法,我们就这么实现吧,DeFine,一个accumulate。
16:11
必须这么实现,然后它里边的参数呢,这也是定义好的,必须是前面是一个accumulator,就是我们当前的那个,呃,这个就是我们的这个聚合的起来的这个状态,对吧?先把这个状态放在这儿,那大家知道它的类型,这里边你这么定义的话,就还是这个二元组了,然后后边呢,呃,接下来就是你要聚合的一个新的值,那我们这里边你干脆就把那个温度值传进来就完事了嘛,对吧,我就把这个tempb传进来就完事了,那这里边这个应该就是一个double类型啊,那这里边里边的这个实现呢,大家看到你其实就是把这个状态的更新,更新完了就完事了,对不对啊,不需要有任何的输出,所以它的输出其实是一个必须是一个的输出,它的它所有的这个操作都在更新这个状态,那这里面就有一个问题了,哎,大家可能会想到你要更新这个状态的话,我这里面状态是一个二元组啊,对吧。
17:12
来了之后,那那这个二元组本身这应该本身这一个参数它是不可变的对吧,我们默认这里面它应该是一个这个value啊,诶所以这里面边你假如说我们想要把它包装成一个可变的类型的话,那最好怎么样呢?最好在外边再去定义一个类,就是我们这里边定义定义一个类,专门用于表示啊聚合的状态,也就是我们所谓的那个accumulator,对吧?啊,那所以这里边我定义一个plus啊,当然这个avg平均温度的啊,Imul ACC我定义一个这样一个东西,那这里边它主要要保持什么呢?这里边是画对吧?呃,我我定义这个变量啊,它的这个属性主要就两个,哎,或者说你你直接定义成那个,呃,就是case class。
18:12
相当于也可以对吧?啊,就是它里边有这样一个属性就可以了,然后我们这里边就是要它能改变这个值嘛,所以说本身是一个0.0的一个double类型,对吧?啊,然后我们再定义一个countt,这是一个零,那我们当前定义这个状态的时候呢,你就不要类型定义成这个二元组了,这里边我们直接定义成这个当前这个类的类型,对吧?哎,然后后边这里边它的这个accumulator也应该是avg探ACC这个类型,然后接下来你就不能直接去下划线一了,而应该用这个当前下划线sum去除以下划线,呃,不,不是下划线啊,就是当前的这个sum属性除以当前的抗属性,对吧?哎,直接移除得到这个结果就完事了,然后同样我最终create的,呃,就是一开始create的时候也要去创建的,就是一个当前。
19:12
的这个,呃,Avg timeb ACC对吧?这里边创建的时候呢,直接把它拗出来,你有一个实例不就完了吗?对吧?直接创建,然后这里边要处理的呢,类型也是一样,改成这样对吧?那接下来我再去做处理,当然就比较简单了,我直接用accumulator.some加,等于加上当前新传入的这个time值对吧?然后accumulator.count加一就完了对吧?哎,所以就是这样的一个处理流程,好,那当然接下来我们还需要在代码里面去做一个具体的调用,那代码里面调用的过程呢,还是分成两类,一类是这个table API的调用方式啊table API调用的时候,我们首先要还是创建一个当前的这个实例对吧?一个函数,函数类的一个实例,你有一个avg temp,先把它创建出来。
20:12
然后接下来呃,这个调用的时候,大家看到我们得到这个table啊,调用过程就非常简单啊,因为它都是把这个数据要传进去,然后我们直接呃聚合得到一个结果就搞定了对吧?啊所以这里边并不涉及到表的join语的过程,我这里面怎么去做呢?诶,直接用这个sensor table,然后呃,首先大家要注意啊,因为我是针对每一个呃这个sensor去做聚合对吧?哎,那接下来我当然是先要去做分组了,首先是做group group里边传的就是当前我去做聚合的那个呃分组的那个字段对吧?ID先做一个group,然后后面大家看到了,我如果经过这个group之后,后边呢,可以直接调select,这个知道大家在里边去做那个,直接调那个系统内嵌的那些聚合函数对吧?或者怎么样呢?自定义聚合函数,自定义聚合函数怎么。
21:12
Aggregate对吧?大家看aggregate这里边传的这个参数,我这里面要的这个参数啊,大家看这个一里边传一个expression,这里边就可以传一个aggregate function这样的一个expression对吧?哎,那为什么是个expression呢?因为这里边我还得把这一个得到的结果给它做一个重命名,就是得到结果的那个字段对吧,做一个重命名,所以大家看这个具体的调用方式就是什么呢?我去创建一个自定义的aggregate方式去做调用,然后as得到结果做一个字段重命名,后面就可以select再选取出来了啊,所以这里边这个这个方式其实非常简单啊,调用的这个方式就直接调用我们已经注册好的这个avg tank对吧?里边要传的参数是什么呢?我们要的是。
22:04
大家看参数就是一个temp嘛,所以这里边我直接就传当前的这个temperature就可以了,诶我们前面这里边你定义的那个字段是什么,这里就传什么,然后后边呢,As定义好,诶这里边as最后得到的只有一个结果,那就不需要包成好几个字段了,我直接就叫做avg time就完了,对吧?啊,这个字段跟前面这重名这个没关系啊,直接叫一个名字,然后后边做一个select把它提取出来,我们要这个当前的ID以及当前的avg探对吧?啊,这样就可以得到最后的结果,那同样我们也可以用这个CQ做一个实现啊,那CQ实现的话,首先还是需要在表环境里边去注册表,首先我这个叫做sensor,然后把这个sensor table注册进来,那另外呢,还需要去注册函数。我们当前的这个register,一个方。
23:04
我定义的这个就叫做avg ta对吧?呃,把前面定义好的这个avg temp做一个传入,然后后边去写这个CQ的时候啊,我直接定义这个CQ table,那直接就是啊,就是table env,然后CQ query对吧?大家看一下这条CQ怎么写呢?Select。哎,我们现在要的这个字段是什么呢?哎,要的是ID,另外还有一个调用了这个聚合函数之后得到的那个结果对吧?哎,所以我直接avg,然后呃,里边temperature,我后边如果不用这个字段的话,不重命名也OK,对吧,然后from。From sensor啊,那另外还得有一个group by ID对吧?啊,这个大家就是知道,按这种方式写一个非常简单的CQ,把它拿出来就完事了,最后我们可以把它做一个打印输出result table啊,Toend stream,呃,这里边大家要注意,既然我们做过了这个聚合对吧?那这里边你要做toaend stream是不是就不合适了?哎,这里边肯定就不行了,对吧?所以这里边我们需要去to retra string,因为这里边我们并不是一个窗口输出一次,后面不改啊,我们这里面没有窗口,那就会在之前的那个聚合结果上不停的更改,不停的输出,对不对?诶,所以这里边就是肯定是要有更新操作的,所以直接出tra stream,然后肉把这个引入type肉,对吧?啊,然后这里边print输出一下,这个是当前的这个result,然后后边同样table同样也是to stream。
24:54
然后类型是肉print,打印一个CQ出来,最后不要忘记要把当前的这个执行起来,对吧,当前是aggregate function test,好,我们接下来来运行一下,看看效果怎么样。
25:12
好,现在运行完毕,我们看一下当前输出的结果是什么样的,哎,大家看到现在就是每来一条数据都会输出一次当前的这个聚合结果,对吧?啊而且你看前面一六七十这是先进来了,然后第二条数据这个341的第二条数据32进来之后呢,诶,它会直接输出一条false又更改对吧?呃,35.8这个false了,这个平均数不是它了,现在平均数呢,它跟32求个平均数,得到的是33.9了哦,然后后边你再更新的话,33.9又又放了啊再一更新来了一个36.2,所以这个又变大了一点,对吧?啊后面同样还是每来一条数据,就会FORCE1条之前的数据,然后再输出一条更新之后的新的数据,下面的这个CQ的输出一模一样对吧?所以最后统计出来的结果是正确的,没有问题的,大家可以把这个结果跟你直接调用系统内嵌的那个average,呃,那个函数做一个比。
26:13
对,看看是不是结果一样,当然这里边我们只是举了一个系统已经实现的一个功能啊,大家想如果说我自定义啊,就这里边我输出的这个聚合结果呢,不光是要一个这个,呃平平均的温度值,我可能还要再结合一些别的一些信息输出,比方说像之前我们自定义那个reduce的时候,我还要把它当前的那个时间戳,对吧?呃,做一个提取转换,然后最后输出,大家想是不是相当于我在这个聚合状态里边再多一个值就可以了啊,对吧,就相当于可以用这样的方式给它做一个就是复杂的一个定义,这种情况就自定义的这种方式就会实现的特别容易啊,这就是关于我们聚合函数这部分内容。
我来说两句