00:00
这种用法呢,其实整体来讲比较简单,而且呢,它比起some max max by明白这些基本的聚合函数呢,更加的一般化一些啊,就我们知道用reduce可以实现max max by可以实现sum,就是基本的那些他都可以干,而且还能干一些其他的基本的那些函数做不到的一些事情,但是我们会发现这个规约函数啊,它也是有缺陷的。他的一个显著的制约就是。规约的时候只能得到一个同样数据类型的结果,因为它是规约嘛,从多规约到少还是一样的东西,只是把这个东西看起来好像做了一个合并,做了一个精简而已,它的数据类型是完全不能变的,输入是二元组,那么中间我规约的时候得到的是二元组,最后输出的也是二元组。那大家就想到了,那有一些比较复杂的场景,我能用这种方式去做吗?那可能就不太容易了,比方说我中间规约的这个东西可能不止一个。
01:05
可能有两个,那怎么办呢?那就是比方说最简单的一个场景,就是我要算平均值。一堆数来了之后,我要算一个平均数,你应该怎么去做这个reduce计算呢?大家会想到reduce的话,我要规约,我可以规约个数。可以把它归约起来,把它后面跟一个一,然后分别叠加就完了嘛,然后另外还有一个就是我可以规约当前所有的那个和你不是要求平均数吗?那我算一个sum不就完了吗?啊,所以大家会想到我这儿可以把它定义成,比方说定义成一个三元组形式啊,那这样的话就是某一个user,比方说我要统计它的时间戳的平均值吧,那我就把user的username user本身,然后再加上当前所有。访问过的那个数据的他的和还有。所有的个数作为一个三元组放在当前reduce的这里边,然后接下来呢,再来一步,后面再来一步,我我把这个每个窗口的那个统计出来之后,大家会发现这没完对吧,因为你只有这么一个三元组,没用啊,我要的最后平均数,那怎么办?再把它map一下,两个数一除嘛,后面两个数除一下就完了。
02:16
但是自然我们想到这个有点麻烦是吧,那有没有更加灵活的方法呢?那就是我最后我可以输入的是二元组,或者说我输入的是这个event,最后我就想直接输出一个double类型的啊,或者说一个前面带着user,后面一个double类型的一个二元组,我想改它的这个类型,然后中间聚合的过程呢?我想保存的东西也可以跟他们不一样,能不能这么灵活呢?可以的,弗link给我们提供了一个更加一般化,更加灵活的个函数,它就叫做。聚合函数aggregate function,所以它使用起来应该就是一个窗口增量聚合里边的一般化的大招了,基本上就是各种灵活的类型啊,不同的需求都可以用它搞定。
03:02
啊,就它可以看成是reduce function的一个通用版本,它里边呢,它有泛型aggregate function啊,它也是一个接口,它的泛型有三个类型,三个泛型,一个叫in输入类型,输入数据的类型,还有一个类型叫做out输出数据的类型,哎,所以它相当于可以做类似于map转换了啊啊你输入和输出可以不一样,然后中间呢,还有一个类型叫ACC,这个是累加器类型,什么叫累加器类型呢?因为我们知道做增量聚合中间一定会保存一个中间聚合状态,所以当前的这个中间聚合状态也可以跟输入输出的类型不一样,这在内部叫做一个累加器,它的类型单独用ACC来表示。好,接下来我们具体的看一看这个和函数啊,Aggregate function到底怎么去用,在这里我们单独的去创建一个测试的类。这个就叫window。
04:02
Aggregate。然后里边的基本的用法跟那个reduce的做法其实是差不多的,同样还是前面我们应该有这个数据的读入,然后后边有基本的转换,然后当然了,你如果要是考虑到事件时间的话,该提取时间说分配auto ma这个也要有,然后后边就是KBY之后开窗,然后聚合啊,所以整体来讲把这个copy过来啊。大概的流程先copy过来。现在的话我们就直接用click source,那这一部分就删掉吧。背后的不要忘记画括号打起来,然后接下来要测试的话,那就是基于这个STEM,我们可以做转换,可以KBY之后开窗,现在既然已经是类型可以随便改变了,那我们就没有必要再非要把它map成二元组了,那这个就方便很多对吧?那所以这里边我们干脆就直接来一个dream。
05:01
先去做一个KBY贝塔,然后贝塔点负啊,我们先把刚才我们想到的计算均值的那个做一个实现,比方说这里KBY之后,呃,那我们就开窗喽,Window,然后tumbling even time Windows,然后这里边我们给一个time.second。十秒钟的一个稳定窗口,然后接下来之前是我们说如果要是不平均值的话,这个很麻烦啊,很绕,那现在我们就直接来一个aggregate。如果是aggregate function的话,调用方式是什么呢?哎,之前reduce function调用方式是点reduce,现在aggregate的话,调用方式就是点aggregate啊,所以这个很好理解啊,调用的这个接口不一样。直接可以去扭一个。Aggregate。诶,大家会看到里边它有三个类型啊,一个输入它已经指定了,我看到当前的这个输入类型event,当然这就是event了,至于后面两个呢,后面两个一个是ACC中间的累加器聚合状态啊,累加器状态一个是输出,那现在他不知道,突然默认就都是object。
06:12
那我们现在知道啊,如果说我想要最后的输出是一个当前每一个用户他访问的那个时间戳的平均数的话,诶,那这里我们可能最后应该要有一个输出是一个二元组,对吧。负二,然后呢,前面是一个user,前到底是谁,后边就是啊,当然这个平均时间的话,那要不我们用一个string,就是把它调试成一分几秒,写成那个转换之后的那个状态,把它写成一个string啊,然后这里面的object中间这个ACC的类型,我们要保存什么,中间聚合状态呢?那应该也是得有两个,用一个二元组。我这儿已经是批败了嘛,这里边我是为了最后输出的时候啊,呃,给一个当前user的这个信息,那其实你前面保存具体状态的时候,没有必要保存user的信息,因为我们KPI之后所有的这个窗口,他做计算也只是一个user,一个K,按照当前的这个窗口去做处理,对应的数据并不会跟其他的K混在一起,所以这里边我们只保存只定义两个就可以了,一个是当前的所有数的和另外一个是所有数的个数啊,所以这两个的话,那都应该是浪了。
07:24
那或者把这个个数定义成也可以稍微做个区分,可以看到三个类型全部可以不同啊,那接下来我们就是一个interface接口,里边必须要实现四个方法。这都已经列在这儿了啊。大家看一下哪四个方法呢?首先一个叫create accumulator啊,很简单,从字面上理解就是创建一个累加器。说白了就是说你既然这是有状态嘛,中间有这个聚合状态嘛,那你初始状态是啥呢?总得有一个初始值吧,哎,那所以这里边我们初始值非常简单。
08:01
MORE2OF2个值,一个零,另外一个还是零,对吧?一开始这个和是零,然后个数也是零,然后还有一个方法叫ADD,那大家想这个创建一个累加器啊,这如果看成是一个初始化,这有点像一个我们那个open生命周期对吧?它应该只调一次,那下面这个A呢,A就不止掉一次了。它明显这个方法就是要叠加,要去做聚合,所以它是每来一个数据,大家看event来了,输入数据来了,每来一个数据就去调用一次当前的爱的方法,然后要去做一个,诶我还可以获取到当前的状态,除了当前的数据之外,还可以获取当前的状态,然后要返回什么呢?大家看返回的类型是当前状态的类型啊,也就是说你既然来了数据之后要去做处理嘛,现在并不输出,因为我们说窗口里边你这个增量聚合啊,每到窗口的触发时间是肯定不会输出结果的,所以不涉及输出,但是涉及到状态的改变,状态的叠加啊,所以这里边你要返回的是状态改变之后的状态。
09:09
那现在来了,数据改变成什么样呢?那当然就是前面的这个和,那就改变成原先我们这个不是叫accumulator吗?累加器吗?那就是accumulator的F0,这个是和加上当前。Value点三啊,然后。另外就是后面那个是个数accumulator点。一哎,加上一个数加一嘛,哎,这不就是这样,和和个数统计起来了。然后第三个方法叫get result,这是干什么呢?很明显获取结果嘛,哎,那我们说这个窗口函数有计算的过程,增量聚合嘛,一点一点来一个数据就增量聚合一次,你最后还得输出结果啊,你最后窗口触发计算的时候,到底是要得到什么结果呢?是这里的result就调用这个方法啊,当然如果说我们窗口只触发一次的话,就只调用一次,理论上它应该是触发一次计算就调用一次返回的这个类型,看这应该是我们之前的alt类型,你最终输出的结果是什么啊?那是string string的一个二元组,我们想要得到的一个信息啊,那这里面我们就直接给一个,这里面有一个问题就是其实我已经获取不到当前user的信息了,因为我没在accumulator里面保存。
10:29
哎,那如果说我们想要做到这个事情的话,那其实应该还要把user的信息也保存进去才行,所以安翠啊,最后的信息。直接保存成一个string。那下边的这一个。The result应该重新写一下了,返回一个string类型就完事了,那这个string类型我们就直接new一个stamp。里边要传当前平均值的那一个时间戳,我们在上面明确的把这个写出来吧,你有一个time STEM。
11:05
你有一个time里边的这个值应该是accumulator,它里边的。F0和去补以accumul,点F1,然后这样的话不就是平均数吗?然后我们基于这个再生成一个time STEM啊,然后这里边我们直接把这个time STEM去换回就完事了。这样的话就搞定了,最后还有一个方法叫做merge,这个merge又是干啥的呢?合并它是合并啥呢?啊,合并两个累加器什么时候才会有窗口,要合并两个累加器呢?那其实就是两个窗口要合并的时候才会有这种情况,所以它一般是在会话窗口里边才会涉及到墨的一个定义,我们这儿其实不涉及啊,你就不实现也行,还是实现一下吧,万一用到绘画窗口怎么办?那这个也简单,二元组嘛,和加和,然后个数加个数不就完了吗?哎,所以这个也非常简单啊,还是最后返回一个二元组啊,然后就是a.F0加一点F0。
12:09
这是和加在一起,A点F1加,B点F1加在一起,数加在一起,那这就完了。就是所谓的这个aggregate的过程。好,我们把这做一个print打印。大家来看一下后果怎么样?因为我们当前定义的是那个十秒钟,所以大家看得这个等啊,现在这里这个平均值,这就有后面这个毫秒数了啊,但是现在有一个缺陷就是我们看不到到底是哪个用户的啊,只是隔了十秒钟输出了好几个用户对应的它的那个时间戳的平均数啊,确实起到我们这个效果了,因为大家看到都是40多秒的时候,它的平均数当然也是40多秒了啊,然后50多秒的时候,诶,平均数都是50多秒,然后零几秒的时候啊,输出都是零几秒,然后现在是十几秒,25分十几秒,然后再一输出都是20几秒,哎,所以这明显就是十秒钟之内它的那个平均值。
13:08
啊,这个没什么实际意义,但是我们可以看到这个aggregate能实现这样一个功能,用reduce的话,那就相当于后面还得一步,就是在这个reduce完了之后,还得再map一下,它就没有这么灵活。
我来说两句