00:00
接下来我们再来介绍另外一大类常见的UDF,那就是所谓的聚合函数,其实我们之前说在系统函数里边,这也是一大类函数,主要就是针对多行数据,然后要聚合成一个标量值啊,所以它其实是一个标准的多对一的转换,比如说哎,我们要some,或者说要做count,那就相当于是把所有的数据,然后做一个对应的求和,或者是做一个统计计数的过程,最后得到的呢,就是一个标准的值,就是一个数值,所以最终我们想要做聚合的,不管是分组聚合也好,还是开窗聚合也好,所有的这些数据合起来之后,它会是最终结果动态表里的一行数据。这是一个标准的类似于reduce或者是aggregate这样的一个聚合过程。
01:02
所以我们说它是一个多对一的关系,前面我们讲到function标量函数是一对一,类似于map,而table function呢,是一对多,类似于flat map,现在聚合函数那就是标准的reduce吧,多对一的关系啊,那同样自定义聚合函数呢,我们需要继承一个抽象类aggregate方式。哎,这里需要注意的是,之前我们在学习data streamampi的时候呢,在讲到窗口操作的时候,增量聚数里边同样也有一个aate方式,这两者不同不一样啊,所以这里边我们要看一下源码当中。Aggregate function。我们看到当前在table API里边定义的这个aggregate function。它是在flink table functions对应的这个目录下边的,这跟我们之前的介绍是完全一致,那它的基本基本的泛型呢,只有两个,不像之前我们在窗口的增量聚合函数里边看到的那样啊,它是有in ACC和输入输出,还有中间的聚合状态类型都有,现在呢,它就只有一个T和一个ACC。
02:20
啊,那这里的T是什么呢?跟前面表函数table方式里面的定义一样,它是当前输出结果的数据类型,也就是我不是要聚合出一个值吗?那就是聚合出来那个值是什么类型,这里的T就是什么。那另外还有一个acccc就很明显了,就是中间聚合状态的类型。跟我们之前的那个概念是一样的,然后我们会看到它下边,诶,这里就真的有一些抽象方法了,除了那些that kind啊,还有类型相关的,这里面真的有一个抽象方法叫做get value。Get value。我们会知道这就类似于之前。
03:01
窗口增量聚合函数里边我们所要实现的那个get result,这不就是获取最后的结果吗?诶,那所以我们看到它就是传入当前的ACC,然后从里边直接获取到最终的结果,返回是一个类型。那对应的我们之前讲到的那个窗口增量聚合函数啊,Aggregate方式里边除了获取最后结果的get result,那还应该有一个create aumulator,创建累加器的方法,还应该有一个ADD的方法,每来一个数据怎么去做操作啊,对应的还有GE方法,对吧?那墨GE我们知道一般情况可能用不到,那至少也应该有创建累加器和每来一条数据之后怎样去累加这两个方法吧。呃,那现在这个table API里的aggregate function没有那两个方法,但是那两个方法很显然也是必要的,所以这又跟之前讲到的那个表函数和标量函数里边的方法是类似的,就是没有在当前的抽象类里边做声明,但是我们必须实现。
04:12
那必须实现的这两个方法呢,叫做。同样的,Create accumulator和accumulate。上面这个create aumulator这个都一样,这就是我们创建一个累加器的方法啊,所以它是在初始的时候调用的啊,所以整体来讲,我们这个工作原理跟之前的增量聚合也是非常类似,初始的时候我们先创建一个accumul,它就是用来储存聚合中间状态的这样一个累加器,然后接下来呢,针对每一行输入的数据,每来一条数据就会调用一下当前的accumulate这个方法,这就类似于之前的ADD方法了。这个方法关键就在于要结合当前的输入数据和之前的累加器状态来更新累加器。
05:03
这是整个聚头的核心过程。全部数据都处理完成之后,那最后我们就调用get value方法,类似于之前的get,得到最终的结果。所以我们会看到整个这个过程跟窗口的增量聚合非常的类似,但是底层这个接口没有完全统一起来,这也是当前这个linkq不够完善的一个一个表现啊。我们也是希望在未来的版本当中,有可能就所有的这个操作都可以完整的统一起来,我们就不需要再区分这么复杂的东西了,区分两个不同的艾瑞方式。那,那当然了,这里边除了这些方法之外,这几个方法在我们实现自定义聚合函数的时候必须要有。不管哎到底是不是当前这个抽象类要求的啊,不管怎么样,这几个方法是都要有的,那除此之外呢,还有几个可选方法,首先就是莫主啊。这个跟增量窗口的增量聚合是一样的啊,针对会话窗口,我们可以选择默认方法去。
06:03
定义累加器怎么样做合并啊,那然后呢,针对over窗口的聚合,必须实现一个叫做retra的方法,保证数据可以进行撤回啊,那另外还有一个方法叫reset aumulator进行累加器的重置,这个在批处理的一些场景下比较有用。那接下来我们就举一个具体的例子,还是在代码当中来进行一个测试。我们首先创建一个Java class。Udf。Test现在要测试的是aggregate方式。那整体的流程跟之前依然是非常的类似,我们把前面的这一部分代码都copy过来,没方法直接copy。然后接下来我们要改的,当然就是要自定义注册自定义的聚合函数。后边我们使用udf进行查询转换,当然这个查询的话就明显应该跟我们之前直接使用count sum或者是avg啊,这样的一些聚合函数的方法一样,就不会像表函数这么麻烦,要用这个。
07:14
侧写表去进行进行交叉连接了啊,所以接下来关键在于实现一个。自定义的聚合函数。那我们可以想一个稍微有一点实际用途的场景,那就是之前我们已经实现已经做过了直接求取平均数这样一个操作啊,那我们知道现在的不CQ里边我们直接调avg就可以了嘛,我们也直接。计算过当前,呃,每一个用户啊,之前几次访问数据的平均时间,戳这样一个计算,那假如说现在我们不是简单的计算平均数,计算平均数我们知道就是全部加起来,然后除以个数就完了。假如说现在呢,我们还要做一个加权平均的计算。
08:02
计算加权平均值。那这个又应该怎么做计算呢?呃,这个加权平均数其实我们都非常的熟悉,比如说在大学里边,我们可能很多计算绩点,或者说计算一个最终的学期的平均成绩的时候,往往都会做一个加权平均啊,那就是某一门课如果非常的重要啊,专业的主课,那有可能他的学分本身比较多啊,学识比较长,那我们给的这个。权重就会比较重啊,那如果说一个课可能是选修课也不是很重要啊,那他的学时比较短,学分比较少,对应的权重就会比较低,那所以我们往往就是当前的考试成绩乘以对应的学分做一个叠加,所有的都加在一起,这个成绩叠加在一起,最终再除以总学分,得到的就是一个加权平均值。啊,那所以这个是非常常见的计算平均数的一个扩展的场景,那我们看一看这样一个场景怎么样去实现呢?首先当前的系统函数里面没有这样一个直接实现加权平均数的函数,所以我们只好去自定义了,那这里面我们可以定义一个public static。
09:18
Plus。我们把它叫做。Wi的。Average。计算一个带权重的平均数,当前是一个。Aggregate function,注意选择是flink table function下的gg function不能选择到下边这个就是我们之前在做窗口增量聚合时候的这个function。那它里边是需要有泛型的,泛型,哎,我们首先想到当前首先。要有一个返回的结果类型,那比方说啊,现在我们的这个数据里边好像也没有什么特别能计算加权平均数的,我们就还是算平均的时间戳吧,啊,那最后得到的长征性。
10:04
时间戳,平均时间戳,然后另外呢,还应该要有一个累加器的类型,这个累加器我们会发现应该至少要有一个当前的和加权和应该要有一个萨,然后另外呢,还应该有一个按按之前我们计算平均数,那应该还有一个个数嘛,那现在我们可以认为是一个。啊,就是类似于一个加权个数对吧?诶如果是带学分或者说带其他的这样一个,比方说我们某一个数据有多个的话,诶,那就相当于有一个当前数据的加权个数了,所以是一个sum,一个COM2元组,那这个二元组进行操作,我们知道它不方便,所以干脆在这里我们还可以把它定义成一个单独的类似于po类型啊,那我们这里可以单独在外面做一个声明。单独定义一个累加器类型。
11:02
这个类型的话,Public static。就把它叫做。Waited。Average,我们就起avgum。里边主要的属性啊,我们直接把它全定义成public类型,方便link底层去做处理。初始当然是一个零了,那另外我们就简单一点,不用带学分那个小数点啊,直接定义一个count就可以了,这个count初始也是零啊,那如果说我们想要去进行。叠加的话,那就有可能出现什么,就是来了一条数据,诶当前这个time stamp,它它有一个一个具体的值,那这个数据它的时间戳出现几次呢?那这个count值有可能直接给他一个给一个,那就说明这个数出现了三次,出现了五次,那接下来我们就直接当前的时间戳乘以五叠加在这个sum里边就可以了。
12:02
这也是一种处理的方式。我们可以进行这样的一个。计算,那这里我们就直接使用当前的的AV放在这里作为我们的累加器类型。好,现现在我们发现必须要实现一个。Get value和create accumul2个方法,那接下来首先我们知道啊,这个get value的话是最后相当于类似于get嘛,我们放在最后再实现,首先上来之后应该要去实现一个累加器就创建。初始化一个累加器啊,那当前这个累加器呢,很简单,就是直接你一个累加器类型的对象就可以了,所以直接你有一个初始值,我们都已经附好了嘛。V的avgumul,然后接下来关键是要实现一个。累加计算的方法。
13:01
这个方法呢,我们没有办法直接从就是继承的抽象类或者其他的一些接口里边,直接把它override啊写过来,那这里边就只有手动去书写了,而且注意这个方法必须是public类型,没有返回值,而且必须名字叫做accumulate。跟之前的evil是类似的,这个名字不能变。然后里边的参数呢。也有要求,第一个参数必须是当前的。Accumulator。中间聚合结果类型。我们可以把这个写进来,Accumulate accumul叫做,然后后边的参数,那就是调用当前累加计算这个函数的时候啊,做聚合的时候要传进来的参数了,那我们现在做这个加权平均计算,显然要传两个参数嘛,就是当前你要算的那个值,那个时间戳到底是多少,然后呢,诶,它到底出现几次,还有一个count。
14:05
或者说那个权重要穿进来,那接下来我们就给一个长整形的。一个value啊,我们把它叫做I value。为了跟其他地方区分,另外来一个类型整形的I wait。一个权重这样的话。有了这两个值,接下来就可以进行计算,那首先就是accumulator的,那就应该加等于,在之前的基础上用。I value乘以I wait。乘积叠加到some里面啊,那然后当前的。数量count值,那当然就是在之前的基础上直接加上当前的权重就可以了。这就是一个基本的。处理过程的实现,那最后呢,我们需要get value获取当前的值,那那当前的值要计算这个平均数,那其实应该是要把sum要除以count啊,那这里面还涉及到两个问题,就是这count我们得判断它不是初始的零,对吧,至少得有值才行,所以这里可以做一个简单的if判断,如果accumulator count。
15:16
如果要等于零的话,那这里我们脆就return了,没else的话,这个时候我们在return aumul。除以accumulator点。Count。哎,这样就得到了最后我们的加权平均数。好,对应的这个聚合函数,自定义的聚合函数我们已经定义好了,接下来那就是在上边。要去做一个。注册,我们把它注册成就叫做VT average啊,然后后边的话。当然就是把这个对应的传入,然后接下来再进行。调用查询转换的时候,诶,这里边我们就涉及到。
16:02
怎么样去调用这个聚合函数了,它的用法跟我们之前的类似于这个count avg啊,就是类似于我们直接调用avg去求取加权的那个求取平均数的那个过程,这里比方说我们直接就是user以及当前的TS啊,这个不能直接把TS拿进来了,因为我们可以按照user去做一个分组,然后进行聚合,那这里TS显然就拿不到,那我们现在关键是要把当前的weight。Avgav的average拿到我们传进来的是它的TS以及个数的话,我们这里边没有特殊的需求啊,每个就按一算就可以了,那当然这个跟直接调用avg应该得到的结果是一样的啊,那比方说我们把这个叫做。Wag。这两个字段拿出来from当前的click table。Group by。User,这样的话我们整个查询转换就写好了,那注意这里得到的结果可以直接to data stream打印输出吗?
17:07
哦,那就要考虑当前到底有没有更新操作了,我们当前既然是做了加权平均值的计算聚合计算,那很显然每来一条数据之后,对之前所有数据的加权平均值是有影响的,我们只是做了一个分组聚合,诶,那所以当前很很显然需要tolo。这样才可以得到最后的结果。我们可以来运行一下。我们可以看到对应的输出确实是有更新操作的,而且可以看到啊,每来一条数据之后,一开始只有一条数据,那当然都是加爱了啊,一秒两秒三秒,然后接下来。第二个Bob的数据来到之后,我们看到之前的平均时间戳两秒就减掉了减U,然后变成了两秒和四秒的平均数是三秒,那再来一条数据,五秒的数据的话,那就减掉三秒的之前的平均数变成了3.667秒啊这样的一个平均数。
18:16
每来一条数据之后,接下来都要做一个更新,这就是自定义聚合函数的实现。
我来说两句