00:00
接下来我们再来介绍另外一类非常常见的函数类型,那就是所谓的聚合函数aggregate方式跟之前我们介绍到的标量函数和表函数不一样,我们说这个标量函数呢,它类似于一个一对一的转换,就像map操作一样,而表函数呢,它是一对多的一个转换,一条数据输入可以扩展出一个表。而现在的聚合函数呢,就是类似于我们说的多对一的转换,有很多条数据收集起来之后聚合出一个唯一的结果啊。所以前面我们讲完了一对一,一对多,现在要讲的就是多对一。聚合函数这个概念之前我们在系统函数里边已经专门介绍过啊,呃,就是像我们前面如果说想要做一个聚合查询的话,不管是分组聚合,还是窗口聚合,还是开窗聚合,那其实都可以直接调用系统内置的一些聚合函数,比如说sum max me avg count等等等等啊,把很多条数据结合在一起,然后统计出一个唯一的统计结果。
01:04
那如果说有一些需求没有办法直接调用现成的这些函数的话,那我们就需要去自定义了,所以自定义的聚合函数呢,跟前面讲过的其他类型类似,它也需要去继承一个抽象类,这个抽象类的名字就叫做aggregate function啊,那这个aggregate function看起来有点眼熟啊,之前我们在讲到data没PI的时候,讲到窗口的聚合操作,增量聚合函数当中就有一个applicgate function啊,那当时它是比较特殊的啊,里边有四个抽象方法,我们说它里边是有一个内置的accumulator,一个累加器作为中间的一个聚合状态啊,那每来一条数据呢,就调对应的一个ADD方法,最后获取窗口计算的结果,那调用的是get result方法。那现在我们所谓的这个自定义的聚合函数A,这个方式跟之前的不一样,这是tableable API当中给我们提供的A方式,不过呢,它的基本原理跟前面涉及到的基本上是差不多的,我们可以看到啊,Agggate呢,它有两个泛行参数,一个T,一个ACC。
02:10
跟之前我们在窗口操作里面的那个增量聚合相比,就少了一个参数类型啊,那之前我们知道在窗口的增量聚合APP function当中,它其实是输入的数据类型,中间累加状态的类型,最后还有一个输出的数据类型,那这里呢?呃,在table API里边aggregate function的泛型参数就只有输出的结果类型和中间聚合状态的类型啊,那我们知道输入的数据类型的话,其实在这里并不是特别的关心啊,因为本质上来讲,我们输入的一条数据,这条数据就是我们表里边的一行,就是一个肉码啊,那具体的类型在外面就可以指定清楚,我们这里边只要知道它的中间聚合和输出结果就可以了。那在不定当中,这个聚合函数到底是怎么调用的呢?呃,整体来看的话,其实就跟之前窗口的增量聚合非常的类似,它也需要先创建一个累加器,这就是中间聚合状态啊,那创建累加器初始化的方法就是create accumulator。
03:12
然后接下来呢,对于每一行的输入数据,在之前我们做data stream API,做窗口增量聚合的时候,调用的是ADD方法,现在呢,调用的是一个accumulate的方法啊,它的主要作用就是要更新累加器,这是我们聚合的这个核心的代码逻辑,然后呢,当所有的数据都处理完成之后,最后我们要得到聚合的结果啊,那窗口增量聚合我们调的是get result,现在调的是一个get value啊,那所以对于每个table API CQ里边的aggate方式,这个聚合函数都必须要实现这样的三个方法,一个就是创建累加器createccumulator,另外一个就是哎,每来一条数据之后进行聚合的方法accumulate,最后还有一个获取返回结果get value。啊,所以整体来看的话,跟我们之前熟悉的那个操作是差不多的啊,啊,那为什么他没有跟之前的完全合并在一起呢?呃,一方面就是本身link CQ啊,这一部分的函数接口的设计跟之前确实是不是一回事啊,就是本身他们之间的这个兼容性做的并不是特别好,以后的话可能会进一步的融合,那另外还有一个就是除了上面的这几个方法之外,在to API和CQ当中的这个聚合函数呢,还有几个可选方法。
04:28
比如说诶,我们可以针对会话窗口进行聚合的时候,可以实现一个默认方法啊,这个跟窗口的增量聚合也是一样的,另外呢,如果聚合函数用在这个over窗口聚合,开窗聚合里边,还必须实现一个retra方法啊,这样就是可以保证数据可以进行撤回操作。另外还有一个叫reset accumulator,可以重置累加器,在一些特定场景下会比较有用。啊,所以整体来讲跟窗口的增量聚合还是稍微有一点区别的。好,接下来呢,我们就可以在代码当中来具体的测试一下啊,举一个案例,看看这个方式聚合函数到底怎么用。
05:08
呃,比如说我们知道如果想要计算一组数据,某个字段的平均值的话,那在标准CQ里边啊,就有一个非常经典的聚算函数,叫做avg,直接可以算平均数,但是我们想啊,那有时候这个需求我们算这个平均数就没那么简单,就不是说把一个某一个字段啊全部加起来,然后除以个数就完了,有时候我们还要算一个所谓的加权平均值啊,比如说我们在计算这个学生的加权成绩的时候啊,就可能每一门课我们有一个考试成绩,而每一个门课呢,它又有不同的权重,因为我们知道这个重要程度不一样嘛,呃,学分不同,所以它就会有一个不同的加权技术,然后接下来呢,呃,那我们计算平均值的时候,就是要以每门课的分数一个score,然后乘以它对应的那个系数啊,比如说我们叫做N,把所有的这个值都乘起来之后,然后做一个累加。
06:03
最后再除以他们的总系数。所以这就是所谓的加权平均值的一个计算的过程啊,那在CQ当中没有现成的计算加权平均数这样的一个聚合函数,那如果说我们想要实现的话,就只能去做一个自定义实现了,所以接下来我们在代码当中去做一个聚合函数的测试。我们还是新建一个SC的object。Udf TX。现在要测试的是聚合函数aggregate function。面方法,先写出来。那开始的处理流程,整体处理流程其实跟之前还是差不多的啊,我们可以直接把表函数的测试流程直接先完整的copy过来。引入对应的依赖啊,首先我们创建了表的执行环境,然后创建了一个连接器表,从文件当中读取数据,接下来呢,注册对应的聚合函数。
07:01
我们现在要去自定义一个聚合函数,那接下来呢,就是调用函数进行查询转换,最后打印结果输出,所以接下来的关键就是实现自定义的聚合函数。啊,那这一步我们当然还是自定义一个class。我们要做的一个统计是类似于加权平均数的一个统计啊,所以是计算加权平均数。那么这个类名我们就叫做带着一个权重weight average a。哎,那要去继承的是一个aggregate function。那对应的aggregate function,我们在inport的时候一定要注意,那就不是flink API common下边的functions,而是table下边的functions,我们选取的是table API里边对应的这个实现啊,点进去之后我们可以看到它有两个造型参数,哎,我们说一个是输出的类型,另外一个是叠加器的类型,里面我们可以看到现在有这个抽象方法啊,啊,一个是get value,这是我们最终要去输出结果的这个方法,返回的类型就是T。
08:10
啊,另外我们说它不是还有这个创建accumulator create accumulator,另外还有每来一条数据之后的accumulate那个方法吗?它没有直接定义在这个抽象类里面啊,所以这就跟之前我们在表函数或者是标量函数里边必须要去实现的那个eo方法一样啊,啊,就是它并没有在抽象类里边定义成声明成抽象方法,但是呢,它的底层逻辑里边又指定了我们必须实现叫做这个名称的一个方法,对应在聚合函数里边,就是必须实现create accumulate,以及每来一条数据之后的accumulate方法。所以接下来这里边我们的输出的结果当然就是一个加权平均值了啊,那在这里我们其实也没有别的这些数据啊,没有学生的分数之类的数据,我们还是基于当前读取进来的啊,对于这个每一个用户的访问事件,它里边能够计算平均数的呢?啊,那也应该就只有一个时间戳啊,那所以我们就把这个时间戳做一个加权平均数的计算,当然了,这个权重的话,我们可以随便给啊,如果说直接所有的权重都给一的话,都是同等的权重的话,那其实就相当于直接算平均值啊,那我们就只是以这个作为案例啊示例做一个实现,所以这里呢,啊,对应的这个平均数我们就给一个长整形long,另外还有一个ACCACC是中间聚合的累加器。
09:33
我们现在想要进行加权平均数的统计,那中间聚合状态到底要保存什么呢?其实这个很容易想到,因为是加权平均值嘛,所以一个就应该是每来一条数据之后,我们要把当前数据所有的那个权重。肯定要累加起来,哎,这个要来一个就累加一次,另外还有一个呢,就是我们真正要统计的那个加权和啊,就是每一个数据乘以它的权重得到的那个和,本质上我们还是一个总和一个个数啊,那这样的话,我们应该是一个二元组这样的类型啊,但是之前我们说过,在表函数里边如果涉及到元组类型的话,有可能底层在进行类型解析的时候会出现一些问题,所以这里呢,哎,我们单独的定义一个样例类吧。
10:20
单独定义样例类,用来表示聚合过程中累加器的类型。啊,那所以这里我们可以定义一个case class。我们就把它叫做weighted带权重的聚合结果啊,Weight average accumulator。这就是我们中间所要定义的这样一个类啊,里边呢啊,那对应的其实就是两个参数了,一个是一个sum,这是一个长整形的值long,另外还有一个类似于count的东西啊,呃,我们就直接把它叫做count吧,其实是一个加权的那个系数的总和,我们就把它叫做count,它是一个int,这里我们可以直接给它附一个初始值,然后后边呢,还要进行变化,所以当前的这个属性我们应该把它定义成Y啊,这是一个可变的变量。
11:12
初始值的话可以直接给在后边是零,那同样后面的count也是Y,初始值也是零,哎,这样的话我们就预先定义好了一个累加器的类型啊,那所以接下来我们在后边A方式里边第二个类型参数ACC,那就直接给上面定义好的weight体的avg accumulator就可以了。然后接下来我们需要实现里边的抽象方法啊,那当然了,这里边我们看到啊,必须要实现的已经我们直接能够这个让自动补全啊,直接显示出来的一个是create accumulator,这是创建累加器的方法,另外一个呢是get value,就是获取最终结果的方法,那create cuumul,这个我们知道,其实就是创建一个就完了嘛啊,这个我们可以直接使用当前样例类的伴生对象啊,直接获取到对应的一个对象元素啊,那所以我们这里不需要有任何的参数,直接用默认值就可以,那就V体的AG accumulator,这样就创建出来了。
12:12
这是创建。磊加气。然后啊,那最后的get value,这个也非常简单,我们就是累加器里边有两个值嘛,所以这里就是把第一个值我们计算出来,这个总和sum除以后边的累计权重这个count值,诶一除得到的就是我们的加权平均数,那这里呢,我们还得考虑一下,假如说啊,这个count从来就没有更新过,如果是零的话,这个除法就报错了,哎,那所以我们可以判断一下特殊的这个情况,如果当前的啊,就是我们这里的这个ACC,当前的这个聚合状态点count如果要是等于零的话。那这个当然就不需要做任何的输出了,我们直接输出一个nu吧。防止除数为零,哎,那如果else的话,这个时候输出的结果就是acc.sum直接去除以acc.Co我们想要的结果就是一个长整形,所以直接做整数除法就可以了,哎,这就是最为简单的啊,获取最后计算结果的一个实验方式,那这里还少了一步,还少了一个方法,那就是我们所说的accumulate啊,那这个accumulate呢,就跟之前我们那个E方法一样。
13:26
没有办法,直接让它自动补全啊,直接重写抽象方法,但是呢,底层要求你必须这么写,只能叫这个名,所以我们直接def accumulate。必须叫这个名字一点都不能错,然后里边的参数呢,其实也是有规定的,它的第一个参数就是当前我们所要能够获取到的内部的累加器状态啊,那所以就是ACC了,我们可以直接把这个copy过来。然后后边呢,它还可以有其他的一些参数,因为我们知道对于每一个数据输入进来之后,我们要调用这个累加的方法,诶,那你肯定得指定到底是数据里边到底包含什么字段啊,我们怎么样提取字段去进行累加呢?所以后边跟着的其实就是从数据里边提取出来的字段,就类似于我们当前这个函数传入的参数一样。
14:19
比方说我们现在要调这个求加权平均的函数,那里边要传什么参数呢?当然就传入的是,比方说你要按照分数去进行计算,那你就传一个分数嘛,如果要算当前的这个平均时间戳的话,那就传一个时间戳嘛,另外呢,后边还得跟上一个系数,加权系数,所以这里面我们其实就是两个参数,一个是当前的数值,另外一个是当前的系数,所以我们比方说这个数值我们叫做I value吧。其实就是我们真正要传进来的一个数据啊,哎,那它是一个长整型,然后接下来呢,还应该有一个I wait。权重权重值啊,按照我们在这个聚合状态里面的定义,它是一个int类型,直接把它放在这儿。
15:04
里边具体的实现啊,那当然就是。每来一条数据。其实就是每来一行数据啊,都会调用这里的accumulate的方法,我们要做的当然就是一个状态的累加,状态的累积了啊,那这里面就涉及到两个,一个就是ACC点萨。它是类型,那要叠加一个什么呢?当然就是叠加当前的I value,然后另外还有一个acc.count它要叠加的就是传进来的I wait,每来一个数据做一个叠加就完了。所以这个加权平均数的。整个续合的过程也是非常简单啊,呃,就是把对应的这两个值叠加就可以了。好,那现在我们自定义的聚合函数已经定义完成了,那接下来就得去注册这样一个函数了,这里class是weighted avg,然后哎,我们就直接把它对应的啊叫做waited avg,后面调用的时候呢,直接用这个名称就可以了。
16:03
那后边的查询转换的话,我们要筛选的就应该是按照某一个K进行分组之后,然后针对时间戳进行的一个加权平均数的统计啊,那我们针对的这个分组的K当然还是user了,UID啊,针对UID去进行一个分组,那我们提取最后的结果就只有UID以及计算出来的加权平均数,这里边它要传入的参数呢,就应该是诶当前的时间戳TS。另外还应该有一个对应的加权系数,哎,我们这里边就直接统一给一个一吧,啊,那这样所有的数据来了之后,加权系数都是一,其实本质上来讲,这就是来一个加一,来一个加一,这就是一个count值嘛,就是相当于没有加权的一个平均值的计算了。啊,那我们可以把它作为一个另外的字段,专门的输出一下,我们可以把它叫做avgts。那from当前的event。
17:01
注意还需要有一个group by u ID,根据用户去做一个分组,得到对应的聚合结果啊,那后边这一部分我们就可以删掉了。有了聚合查询转换得到了结果之后,就可以将结果表转换成流打印输出了,那现在我们这个结果到底是to stream还是to change of stream呢?那这个就要看这个查询到底有没有更新操作啊,其实我们知道啊,当前是要做一个聚合,而且呢只是做了一个分组聚合,并没有涉及到窗口,所以我们在这个原始的表里边每来一条数据,每来一条数据,那就会造成后边我们每一个用户对应着一个哎,平均的一个时间戳啊,比方说这个一个TS,哎,那另外一个用户又有一个平均的时间戳TS。那每来一条新的数据之后,很显然就会更新之前的平均值,那所以这是一个更新查询,我们如果要正确输出的话,必须要to changelot。
18:00
啊,那整体写到这里呢,逻辑上已经是没有任何问题了,但是这里还有一个小细节需要注意啊,跟之前我们讲到表函数啊,Table function里边所注意的这一个类型的定义一样啊,就是在我们这个内部啊,Udf内部,它所处理的每一个字段对应的这个类型呢,都必须要是一个包装类型,不能是啊,就像int这样的我们基本的这种数值的类型,所以在这里啊,如果说我们直接把。返回值的类型定义成了一个长整形浪的话,最终也是会抛出异常会报错的,哎,那所以怎么解决这个呢?那我们可以用一个浪的一个包装类型,也可以比方说更为简单的啊,我们直接用抓网。里边的长整型long类型,如果用这个的话,哎,那后边其实得到的就肯定没有问题了啊,那所以如果是最终返回的类型是long的话,哎,那我们这里get value得到的类型也得是Java的长整型啊,那后边我们传入这个I value的时候,这里也得是Java的长整型,这样的话类型系统就完全没有问题了。
19:06
所以我们看到现在的这个fli CQ当中的udf啊,使用的时候确实还是有很多很多的限制啊,那这些类型啊,还有其他一些进行转换处理,包括这一个抽象方法的调用啊,这种名称的确认,本身在接口里边对应的这种定义啊,现在可能都不是特别的完善,所以相信未来弗CQ的udf应该会越来越好用,越来越规范化啊。接下来我们就可以运行一下,看一看得到的平均时间戳到底是一个什么样子了。我们可以看到现在已经对应的有结果的输出了,果然我们看到最终输出的结果里边有加I,就是直接插入进去的数据,前面几条数据来了之后,Mary ball Alice啊,三条数据来了之后,那平均值当然就只有自己了,这个是没有问题的,都是追加。而后边如果再来新的数据呢,诶,就开始出现了减U加U这样的更新操作,所以我们使用to change of streamam是完全没有问题的,而且就应该使用。
20:10
我们可以看一下啊,第四条输入的数据,Mary这一条它是第四秒钟的访问数据,所以我们看啊,结合之前第一秒的数据得到的平均时间戳就是2520.5秒啊,那同样Bob来了第二条数据五秒钟的数据之后,跟第一条数据两秒钟的数据得到的一个平均值就是3500。如果再来一条报复的数据啊,再来一个,我们三个数算平均值的话,就是4333,这就是计算加权平均数的一个过程,我们使用了table API和link CQ当中的A方式聚合函数。
我来说两句