00:00
接下来我们再讲下一类比较重要的函数,这就是所谓的聚合函数了,Aggregate function,那这一部分其实我们在data streamam API里边也是非常熟悉的啊,大家会想到在datat streamam API里边,我们当时给大家讲过KY分组之后就可以做聚合,哎,那个时候你如果调reduce的话,传的就是一个reduce这样的一个reduce function聚合函数对吧?那另外我们讲到聚合的时候,其实更重要的是后面还讲到了开窗,开窗的时候KPI之后window,然后接下来是不是可以做各种各样的操作啊,哎,当时我们讲到有增量聚合函数,增量聚合函数里边就有reduce function for function aggregate function对吧?啊,当然for function被弃用了啊,那现在的话可能就是aggregate function,这个就是应用最呃最广泛,或者说最灵活的一种增量聚合函数。大家还记得里边要实现那么几种方法对吧?创建一创建一个accumulator啊,然后里边是不是每来一个调一个爱的方法呀,对吧,然后最后是get result,得到结果就完事了,诶那大家就想到那在table API和这个定CQ里边有没有这样的自定义聚合函数的这种方法呢?哎,当然是有的,因为大家知道对于这个CQ而言,其实最重要的功能是不是就是分组之后做各种聚合统计啊,哎,所以这一部分其实它的核心功能啊,所以很多其实系统内置函数呢,我们能想到的那些简单聚合操作,它都已经实现了。
01:28
那这里面主要就是针对,如果说我们还有一些特定的需求。现在的这个函数没搞定的话,我们可以实现一个同样还是一个aggregate function,实现这样一个抽象抽象类啊,继承这样一个抽象类啊,然后去重写它里边的方法,就可以做到自定义聚合了啊。所以这个其实还是挺。呃,可能应用会比较广泛啊,就是所谓的这个呃U对吧,User DeFined方式,它的特点其实也非常的明显,它是不是相当于对于这个数据就相当于不是来一条转换一次了。
02:06
之前我们讲的标量函数和表函数是不是相当于都是针对每一条函数去做的转换?呃,每每一条数据去做的转换,对吧?啊,就是标量函数是来一个转换成一个输出,表函数是来一个可以做拆分转换成多条输出,而我们现在的聚合函数那就变成了。是不是应该先分组,然后有多条数据啊,有很多条数据,然后来了之后是不是最后聚合得到一个结果啊,哎,所以所谓的这个聚合函数其实是一个多对一的关系,对不对?前面skin function标量函数是一对一,而呃,类似于map对吧?呃,然后后边我们讲的这个table function表函数是一对多,类似于flat map,而现在的聚合函数就类似于之前我们的那个聚合,对吧,类似于有点像窗口聚合啊,它是多对一对不对啊,就是很多条数据聚合成一个结果,那大家看一下吧,这里边具体的这个流程啊,大家可以看一眼这个PPT里面这张图,我们这里边有一张表,大家看这张表里边的字段,有ID,有name,有price price的话大家知道这是单价价格嘛,然后ID的话,哎,这是商品的ID,然后name当然就是商品名称了,我们一看这个什么Lett let是拿铁对吧,还有牛奶对吧,还有什么,呃。
03:26
什么布雷味摩卡茶,反正就是一堆饮料啊,咖啡茶饮料啊,一堆这些饮品,然后有它对应的单价这样一张表,那接下来我们要干什么呢?诶大家看接下来我们要做一个操作,是不是想选取它里边的最大的当前最大的那个单价的那个商品啊,那当前最大单价值对吧?就是要做一个非常简单的max操作,那大家想这是不是就是我们之前讲的一个聚合的过程啊。来一个数据聚合一次是不是就完事了啊,大家要把这张表理解成动态表啊,所以接下来我其实就是应该要有一个中间聚合状态对不对?大家想我这个聚合状态里面应该保存什么呀?是不是就是当前最大的那个单价啊,当前的那个price对吧?所以大家看这里边跟我们之前data API里边讲到的那个aggregate方式非常类似。
04:21
它是不是也是有一个accumulator作为一个当前的聚合状态,然后接下来同样它也有一个方法叫create accumulator,那家想这个方法是不是就是一开始初始化创建一个状态啊,啊,那当前一开始初始化的时候,大家自然想你要取那个最大值嘛,一开始我应该给一个比较小的数,对吧,比方说给一个这个最小值对吧?呃,当前这个比方说是,呃,这个int类型的最小值直接放进来,然后接下来大家注意,之前我们那个aggregate function是来一个数据,是不是叫一次爱的方法改变当前的聚合状态,那现在它不是爱的方法。
05:00
它定义的是一个accumulate方法,对吧?啊,所以接下来相当于就是说我每来一条数据的时候,就会调accumulate方法来改变当前的accumulator累加器的这个值,所以你看六来了之后这个啊拿铁呃,这个呃,Let铁啊,六来了之后,第一条数据来了之后,当前这个accumulator是不是变成了六啊,然后接下来三来了之后,当前的这个值是不是没变,还是六啊,然后接下来这个呃,五来了之后还没变,八来了之后是不是变成八了?然后接下来四来了之后变成还是八不变,所以这里边保存的这个状态就是当前的最大值,对吧,这跟我们之前的那个定义是不是差不多啊,只不过就是把它的这个处理由我们之前的那个流是不是转变成了一个动态表的处理啊。那大家想现在我写这这样一条CQ,是不是相当于就要做一个持续查询,得到当前这张表的结果,是不是也是一个动态变化的表啊,其实得到那个结果大家知道不是动态变化的表,是不是就是一个值啊,你就取当前的这个最大嘛,Max嘛,哎,那所以大家看,假如说当前我到五五个数据的时候,这个查询出的最大值应该是多少?
06:17
是不是就是八呀,那输出八,那输出结果的时候,它调用的方法。诶,大家看它叫get value,之前我们那个data streamam API里边是叫get result,它现在是叫get value,名字不太一样,但是大家发现是不是整个原理是类似的呀,所以这里边给大家再说一下,就是必须要实现的方法有这么几个。Create accumulator,哎,这是创建一个累加器,就是初始状态,然后accumulate,这就相当于是每一条数据来了之后都要调这个方法来更新,更新状态跟我们之前的那个at方法是类似的,对吧?诶,那然后最后还有一个你最终要输出结果的时候,它的结果靠什么拿到呢?叫get value方法,跟我们之前的那个get result是非常类似的啊,所以整体流程基本上就是。
07:09
之前那个LT的方方式的一个类似的实现啊,只不过这里是稍微有一点偏差。那接下来我们就来在代码里边做一个具体的实现。那代码里边,首先我们还是直接创建一个Java类,这个叫udf test3当前这个是aggregate function aggregate function。Main函数里边,其实大家想到基本上完全一样是吧,我就直接就抄抄过来了啊,整个的处理流程copy过来。我们就抄到这个呃位方法结束就可以了啊。呃,然后里边我们现在要改的,其实还是就是在这里我要去注册对应的这个,呃,就是创建实例,然后注册udf的时候,这里就不一样了,对吧,我们把这里边空下空开,呃,那么首先这里边我们要实现的一个功能是什么呢?呃,就还是很简单,比方说我们当前有这个sensor reading数据嘛,我干脆就求求当前的那个平均温度得了,对吧,求一个这个平均值,大家还记得之前我们data stream API里边如果写一个aggregate function或者是呃,这个reduce function的话,大家知道因为它的那个类型不能改变吧,所以比较麻烦,对吧?用aggregate方式是不是很容易实现,我们保存的那个状态是不是就是保存当前的sum的所有的那个和当前温度的和,然后还有一个当前的个数count值,有这两个状态,最后是不是一除就是当前平均温度啊,哎,这个非常简单,现在的实现思路其实也是类似的啊,所以。
08:57
接下来我们自定义实现一个一个聚合函数,诶,那么我们求,呃,求。
09:08
当前呃传感器,传感器的平均温度值。呃,那所以接下来我们其实要自定义这样的一个aggregate方式实现啊,实现自定义的aggregate。呃,所以接下来我们能够想到啊,这里边还是一样的啊,Function这里也写错了。接下来public static class还是定义一个,比方说我这个就叫做avg temp平均温度对吧?呃,然后我要去extend一个aggregate function啊,这个大家一定要注意啊,别选错了,大家看下面这个应该是什么,下面这个interface这个接口。
10:05
这是我们之前的那个电stream API里边的方式接口对不对,那上面这个大家注意啊,这选的是这个看不清楚啊,但至少我们把这个先拿出来,大家知道它肯定是个抽象类对不对,然后我们看一下抽象类啊,它是。当前是不是flink table function里面给我们提供的呀,这样一个抽象类啊,然后你会看到它本身继承字是不是user DeFined aggregate function对吧?哎,这个就没问题啊,大家不要引错了,然后我们看到它里边也有泛型,这个泛型比之前我们在之前我们在做那个,呃,Dance stream API里边的那个LG方式,它是不是有三个类型啊,Input,然后ACC output,对吧?啊,就输入输出还有中间状态都有,现在呢,它少了一个,少了一个说明它指定的是什么呢?哎,对,它指定的这里边是不是指定的是当前我最后的结果啊,也就是out的那个结果啊,那为什么他不用指定我当前的这个输入数据的类型呢?对,因为我们在CQ里边是不是已经在外边都已经指定了当前的那个类型是什么呀?对吧,它只要在table里面做转换就完了,我只要指定它输出的结果就OK,所以接下来我当前输出的结果应该是什么呢?
11:20
是不是就是一个一个double类型啊,大家想想是不是就是一个double类型,我只要得到一个对应的平均温度值不就完了吗?啊,然后另外我当前的状态类型是什么呢?状态类型应该是二元组对吧,一个二元组,然后里边的类型应该是对一个double类型的和和嗯,一个int类,一个inte类型的当前的个数,对吧,大家看现在诶他报错了,我们看着报错,其实我们很很开心,因为说明是不是当前这个接口里边需要有这个必须要实验的方法呀,对吧,我们看一眼必须要实验的方法哦,大家看到是get value和create accumulator这个比较简单,Create accumulator的话是创建当前的这个累加器嘛,那是不是我直接你一个。
12:12
二元组对吧,当前是应该是,是不是我直接用一个这个0.0和零放进来就完事了,然后最后我要获取它的这个结果的时候,这个也很简单,是不是直接用accumul的F0除以accumulator的F1是不是就可以了,两个一除不就是平均值吗?哎,但是最关键的是我们想要的是来一个处理一个的那个accumulate这里是不是没有啊,所以非常尴尬,是不是又是必须实现一个。Accumulate方法,哎,那么这个名字还不能变对吧,必须是它对吧,这里面主要就是来了一条数据之后去进行状态的更新,对吧?啊就是。
13:02
来数据之后更新状态,所以大家会要注意啊,就这里边我们没办法直接把它重写,你这里边必须手手敲,手敲呢,还不能敲错,所有的类型和这个定义都必须是完全一样的啊,Public没有返回值void,哎,大家知道如果没有返回值的话,那是不是相当于我当前的这个,就是我当前的这个accumulator,就要去做一个调整,就要去做一个改变啊,对吧?那所以这里边啊,我这个直接用这个public VO void accumulate必须叫这个名,大家不要打错了啊,然后里边accumulate呢,有两个参数,那大家想到前面一个参数是不是就是当前我的。Accumulator啊,当前状态对不对,我要改它嘛,哎,所以这里边我的类型状态是二元组类型对吧?呃,当前的这个,哎,不是啊,Double啊,Double integer这样的一个AQ me,呃,或者我直接叫ACC也可以对吧?或者我直接用前面这个直接叫accumulator也可以啊,这是当前的状态,另外大家想一下我还要什么?
14:19
是不是还必须得有一个当前输入的数据啊,对吧,你必须得有当前数据才行啊,所以这里边我传入的数据是不是就是在这里边体现出来,哎,那这就体现,这就涉及到我当前调这个聚合函数的时候,到底应用给它传进来数据是什么呢?我是不是只传一个当前的那个temperature就可以了,只传一个温度值就行,那应该是double类型对吧?Double类型一个temp,所以大家注意啊,这里边的这个参数先后顺序也不能变,必须是前边是状态,后边是当前传进来的那个字段,对吧?啊,必须是这样的啊啊,所以非常这就非常的奇怪啊,它的很多东西都是写死的,必须这么要,要求就就这么去做,对吧?那这里边我们的改变是不是也很简单,直接accumulator.F0去,是不是加上当前的这个temp呀,然后后边accumulator的F1。
15:20
一去加一是不是就完事了,或者说我直接加加就可以了,对吧?哎,这就是我们当前对于这个状态的改变,好这个聚合函数函数啊,Aggregate function我们就搞定了,那接下来在代码里面怎么用呢?首先table API的用法,我还是需要先创建一个实例出来,对吧?啊,比方说当前我定义一个。呃,就是我这个叫做avg temp吗?我直接就叫做AV temp啊,你有一个avg temp出来,接下来我需要在环境里边做一个注册,我这个就叫做avg temp。然后里边呃,我直接把这个传进来对吧?然后下边我再去调用的时候,其实大家想到我接下来调用是不是必须得先做一个什么呀?你要做聚合是不是首先我应该做一个group by啊,对吧?你不分组的话,我怎么聚合呢?就像我们data stream API一样,他要做那个聚合操作是不是必须先KBY对吧?你datat streamam streamam是没办法直接聚合的啊,那这里边我group by当然就是ID了,按照ID分组每一个传感器,大家看现在是不是就可以调。
16:30
呃,首先是可以直接s select,大家知道s select里边是不是就是现成的系统内置的这些,呃,聚合函数就可以调了,哎,这个就没问题啊,那另外还可以干什么,大家看是不是就可以调aggregate或者flat aggregate这里边是不是就是相当于我自定义的这些aggre function啊,就可以做这些操作了啊,所以当前我定义的是aggregate啊,那么aggregate里边当然就要把我自定义的这个东西是不是要传进来啊,我这里边要的是avg temp,然后当前我指定的字段是不是应该是当前的那个温度值temp啊,我当前应该是给了别名了是吧?
17:10
给了叫做time对吧,把这个温度值传进来,然后调用这个函数,最后是不是得到了一个结果啊,然后我是不是可以给一个别名,然后我把这个比方说当前这个我就叫做呃,A avg temp对吧,我可以叫这样一个别名,呃,那最后是不是就可以对应的做一个输出了,Select做一个提取,当前我要ID,然后TS,另外我还要当前的,诶大家知道当前这个提取就是我聚合之后的话,这个应该没有TS了,对吧?这个好像我们当前这个TS其实并不应该出现在我聚合之后的结果里边,因为我这里面输出也没有它嘛,对吧,所以这里面其实就是直接就是avg temp是不是就够了呀,这就是我当前的这个输出结果。那大家想一下,如果我要写CQ的话,怎么实现呢?
18:03
下面还是先注册表对吧,注册完表之后,接下来我还是select ID,然后另外我是不是应该有一个就直接调用我当前的这个avg temp,然后把当前的temp传进来是不是就可以了,直接这么给就完事对吧?当然你也可以也在as啊,我这边不as也是一样的,那后边是不是直接from,把它还是换一行啊,From sensor,不要忘记group group by ID对吧?哎,就是这个,其实写起来非常简单啊,就这么就这么直接。那注意后边如果我要直接输出的话,这样to a判还可以吗?哎,当前是不是一定要to retract呀,因为当前这个一定有更新了,对吧,直接totra,接下来我们可以打印看一下它的结果。
19:05
好的,大家看一下我们这里边执行出来的结果,这是不是?诶,既然是retract stream,大家会发现这里边前面有一个true false的结果对吧?然后后边就是当前的34ID以及平均温度值,哎,我们就往后看吧,首先哎,大家看按照按照这个ID分组了,所以一上来是不是全是追加都是处啊对吧,三四一三四六七十啊全是处,后边341继续更新温度的时候,现在是不是就要撤回之前的一条,然后再来一条处,再更新一条啊,你看两个求平均数36.05对吧,然后再撤回一条,三个球平均温度,这是34点,呃,967对吧?啊,那最后我们又更新这个,把这个去掉,然后再呃,就是最后处啊,输出的这个35.5,最终我们平均温度是一个35.5度。啊,这就是关于这个聚合函数的一个用法。
我来说两句