00:00
我们现在已经知道怎么样去开一个窗口,然后指定窗口分配器了,那前面我们说过一个完整的Windows操作。包含两部分,除了前面的这个窗口分配器指定之外,后面还应该有一个窗口函数,对吧?诶大家想想为什么前面这个窗口分配器指定了之后,这不算一个完整的窗口操作呢?哎,大家想一下之前我们这个干了一件什么事啊,这里边其实是点K外之后是不是做了一个分组啊,然后基于它在点time window,这是不是把分组的数据又做了一个分桶对不对?诶那家想分筒是把这个数据属于哪个时间窗口这个桶对吧?或者说计入窗口统是都已经分到里面去了,但是你最后这个窗口到底要干什么统计呢?没说对不对,对,大家想最后是不是一定要做聚合啊,类似于一定是要把这个窗口里边所有数据你要合在一起得到一个结果嘛,但是我们前面是不是根本看不出来你要干什么呀,所以大家就发现了,是不是后边在做完窗口分配器开窗之后,一定要再跟上一个。
01:09
哎,聚合操作对吧,也就是说你得指定对于这个窗口里边的数据到底要干啥,你最后到底想计算出一个什么结果啊,所以这个过程就叫做后边靠什么来指定呢?就是我们这里的窗口函数来指定的,但这里的窗口函数无名度方式是一个比较泛指的概念啊,具体来讲的话,那么这个window function里边又分成了两大类啊,这一个叫做大家看叫做增量聚合函数,另外一大类叫做全窗口函数,哎,那这这两个又分别代表什么含义呢?呃,具体来讲的话,增量聚合函数从字面上理解其实还比较好想啊,增量聚合函数就是。就是增量吧,那是不是就是每来一个数据,就在之前的那个结果聚合结果装基础上去做一个增量化的聚合啊,啊,所以它类似于就是我们之前说的这个流处理聚合状态的过程啊,这也是来一个处理一个,在这中间需要保持一个简单的状态,这就是增量聚合函数。
02:15
它的代表就是,哎,大家看reduce function很熟悉,之前我们是不是做过啊,哎之前那个reduce啊,K之后reduce的时候,里边要传的不就是一个reduce function吗?现在也一样,你也可以直接传一个reduce function,就做这个增量聚合。或者还可以干什么呢?传一个aggregate方式,这个比较特殊一点,之前好像KY之后没没见过这个操作,对吧?啊,现在我们引入了这样一个操作aggregate的方式。另外除了这个增量聚合函数之外啊,另外还有就是全窗口函数,全窗口函数的含义就是全窗口吧,它说的就不是来一个就增量聚合一次了,而是来了的数据都先收集起来,先攒着。等到要计算的时候,再遍历所有的数据,计算结果,输出结果啊,所以大家就就会想到像现在这个全窗口函数,其实就类似于一个什么操作啊,对,就有点像批处理一样啊,对吧,那就比方说我这里面有一个八点到九点钟的窗口,那大家想到这个数据来了之后。
03:20
增量聚合是一个什么意思呢?他意思是我这儿有一个桶,这个桶是八点到九点,所以我接下来怎么样这个桶里边保持一个状态,对吧?这样一个状态,比方说我要算这个八点到九点所有数的这个和求和,那大家就想到我接下来这里边存一个sum状态,哎,对,大家其实就想到这个数是不是我不需要存啊,来一个数是不是相当于我直接算出一个这个sum来就可以了,接下来我是不是只要存着这个sum就完事了呀?哎,但是大家注意,现在我这个数来了之后,我这个窗口要输出结果吗?
04:01
窗口不输出结果对吧,因为八点到九点,可能我现在是不是还还没到点啊,对吧,这是你这数据是八点到九点数据一个一个来的嘛,这个没准,所以接下来是来一个数,我就塞到当前这个桶里边,然后聚合一次当前的状态,对吧?每来一个都聚合一次状态,那大家会发现最后我这里边就保持了一个状态而已,然后到点的时候,假如八点到九点啊,真的已经到时间了,那怎么办呢?诶,那是不是直接把这个sum这个状态拿出来做一个输出就完事了呀?哎,所以大家看其实它是这样做的一个操作啊,这就是增量聚合对吧。就来一个就聚合一次,来一个就聚合一次,但是它不输出,像我们之前那个KBY之后直接reduce,大家大家还记得吧,但是来一个就聚合一次,然后是不是直接就输出一个结果啊,对吧,那个是完全流逝的,现在不是啊,现在我们的输出结果还是隔一段时间才有一个结果窗口啊,有界流,所以大家会发现它数据处理还是每来一条数据就处理一次,但输出结果是等到窗口结束时间的,到到达的时候,哎,这个时候我们才输出计算机,那与之对应的这个窗口函数,全窗口函数又是什么样的?
05:16
啊,大家知道全窗口函数的话,那这边做计算就变成了,对是所有的数据来了之后,是不是来一个就放进来。来一个就放进来对吧?哎,等到窗口要结束的时候,我发现到点了,八点到九点到点了,那接下来是不是我把所有的数据都拿出来做一个计算,然后输出一个结果啊,哎,所以它是它是这样的一个过程,所以在中间它并不保存一个中间聚合状态,而是存下来了所有的数据,这就是全窗口函数对吧。啊,那全窗口函数的代表就是process window function,之前我们说过那个process,这其实都是那个底层API,对吧?Process function都是底层API啊,这个是属于一个全窗口函数,收集所有数据,另外还有就是一般话,这这里大家看这是一个特别的一个一个接口啊,就叫做window function,这也是一个全窗口函数。
06:15
啊,所以大家如果要使用这个窗口计算的时候,就分成了这样两大类啊,那有同学可能想到就是那除了这个reduce aggregate,还有这么高大上的这个全窗口函数之外,对吧?那我们更熟悉的,你比方说some max m这些有没有呢?啊,有的啊,大家看这个在这个调用的时候,接下来是不是可以我们把这个上面这个分号去掉啊,大家看是不是这里面可以直接去sum呀,没问题对吧?也可以直接max max by也可以命明白对吧?那前面我们讲过的这些就是所谓的滚动句和函数,这里边都是通用的,那大家想当前的这种操作,这又属于什么呢?
07:00
它是增量聚合还是全窗口聚合呢?哎,其实大家想到做sum或者做max命,它是不是完全可以就保持一个状态,然后增量聚合就可以了,哎,所以这些简单的滚动聚合操作啊,呃,在这里边其实也都属于增量聚合,只不过我们不需要单独再去实现它的聚合操作,像这个reduce function,这不是,这是不是具体我们定义的那个聚合操作啊,啊,所以不需要去实现这个而已,它本质也是增量聚合。好,这就是我们这里边定义的这个结果啊,那有同学可能觉得这两种他们既然有这样的区别,那哪种更好呢?哪种方式更好呢?哎,这就大家可能会觉得好像是增量聚合是不是更好啊,因为他来一个就算一个嘛,然后最后我是不是相当于等到到了那个九点钟要关,关这个窗口的时候,我根本就不用做计算,直接拿出来那个聚合结果输出就完事了,所以它实时性会更好,对吧?呃,效率计算的这个效率啊会更高,延迟会更低,但是另外这个全窗口函数在有些场景下还是非常重要的,为什么呢?你比方说有一些场景,呃,比如说我要排序,排序的话,大家可能还想到我可以做这个规定排序,插入排序啊,对吧,你也可以来一个就排一次嘛,那另外比方说我可以统计一些数据啊。
08:27
我统计当前数据的。中位数。或者说我统计这个数据的啊,就是比方说25%,75%的这个分位数。大家想对于这种数据的话,你是不是前面统计出来的那个结果好像就没什么用啊,对吧,前面的中位数跟后面的中位数可能没什么必然的关系,所以你就完全可以是不是前面我先把数据都收集起来,后面再统一做计算啊,对吧?如果前面的这个计算的过程又又比较复杂,然后对后面的这个结果意义又不大的话,那其实可以做这个全窗口函数计算的。
09:03
那另外还有就是增量聚合函数里边大家要注意啊,你就像那个reduce function,它里边的那个状态是不是是不是相对来讲比较比较受局限啊,能做的事情比较少对不对,那他其实拿不到更多的东西啊,而这个全窗口函数呢,它能够获取到当前窗口的信息。特别是你像这个process function,它还有上下文,它还能拿到相,就是当前的一些时间啊,相关的一些信息,拿到当前的其他的一些状态,所以整体来讲全窗口函数会更加灵活,能适用的场景更多,所以有时候我们在使用的时候会把这两者结合在一起来做调用。好,那接下来我们在代码里边给大家还是详细的看一下啊,啊,那首先你看到这个这个命啊,在这里边如果直接做这个命操作的话,里边是不是跟之前那个,呃,我们KPI之后做聚合是一样的,也是传一个比方说位置对吧,或者传一个那个字段名称,你就可以做这个最大最小值或者sum的这个操作了啊那这里边我们要讲的并不是这个啊,这个比较简单,我们这里面要给大家说的是,比方说前面我们说那个增量聚合函数reduce function,那应该怎么传呢?哎,大家看就是直接可以点reduce。
10:21
这个时候里边要传的就是一个reduce function,对吧,就是这样的一个东西,那那这里面的这个reduce function跟我们之前讲到的那个reduce function,大家看是不是完全一样啊。就是这个东西对吧,就是在当前flink API common下面这个通用的这个函数啊呃,一个reduce function,它实现的方法,Reduce是不是也还是有一个之前已经聚合好的一个状态,然后有当前最新的这个数据得到一个更新之后的状态啊,它还是这样的一个方法,对吧?啊,这是这个reduce啊,就就这么去做,这个就不说了啊,我们这里边要给大家专门要说一下的是aggregate方式。
11:04
那大家可能想到那aggregate function又是怎么用的?哎,这调用的时候就是有一个方法,大家看直接就叫点aggregate,然后aggregate里边要传的方法就是一个aggregate function对吧?然后aggregate function这里边就稍微的复杂一点了,大家看这个aggregate function里边的泛型有三个,这是在干什么呢?这是在干什么呀?大家看,首先in,这当然就是输入的数据类型对吧?然后out,那就是输出的数据类型,这个简单acccc ACC是什么呀?对,大家想累加器accumulator对吧?累加器那大家想这是不是就是当前我的中间聚合状态啊,累加器嘛,当然就是当前的中间聚合状态对吧?所以这里面我可以直接给大家简单的实现一个,呃,比方说啊,我要做一个就是对于当前这个数据里边我做一个count吧。
12:02
大家想就是累加嘛,是不是我可以或者说我可以做一个这个当前温度值的累加,对吧,但是温度值累加好像没没没什么意义啊,那我这里边可以非常简单的去做一个count,数一下这当前里边温度值的个数,大家觉得这个应该没问题吧?啊,所以接下来我这个怎么写这个函数呢。就这里边这个aggregate function这个类型应该怎么写呢?最后我要得到结果是不是就应该是一个count值的话,可能是一个inte对吧。哎,那大家想alt输出的这个结果是不是跟这个中间聚合状态应该一样啊,对吧,你聚合起来是什么,最后直接输出就完事了嘛,所以是一个引体值,然后接下来大家看需要实现的方法有这么嗯。有这么四个。有四个必须要实现的方法,然后首先是一个create accumulator,这是干啥呀?对,这是不是要创建一个累加器啊,哎,所以大家看这个返回值int准,这是不是就是你累加器初始值是什么就是什么呀,哎,所以大家知道这我当前初始值应该是什么零嘛,很简单对吧?啊,然后ADD一个ADD的方法,大家说这是要干啥呢?对,这是不是就是来了之后你到底累加到底该怎么累加对吧?那所以大家知道我现在怎么累加,对,是不是直接加一啊,跟这个value都没关系对吧?这是我当前来的这条数据,但是跟它没关系啊,你如果是sum求和的话,那是不是把里边那个那个temp拿出来就可以了,对吧,我现在不做累加啊,我直接是这个加一就是个计数器。
13:37
那后面还有get result,最后输出结果,结果是什么呢?是不是就是当前的这个accumulator啊,哎,就是这样,就这么简单,最后还有一个墨GE,墨GE这个方法的话,一般情况我们的这个时间窗口里边其实用不到,它主要是用在哪里呢?用在这个session window要去,呃,里边可能涉及到有些时候有这个合并的操作,我们这里面根本就涉及不到合并,因为有同学可能想到,诶,那是不是这个墨子是要做那个分区合并呢?大家想我现在有分区吗?我当前K外之后,是不是所有数据都在同一个分区里啊,所以说不涉及到这个问题对吧?呃,所以一般我这儿是调不到的,当然你为了防止意外,我是不是还是既然你要合并嘛,那大家想这是不是就是你要合并的两个状态,那我是不是直接加起来就完事了,A加B对吧?啊,所以这个其实就是最为简单的一个操作啊,啊,那我这里边可以。
14:32
得到一个结果,我把这个叫做stream。呃,那这里边大家也可以把这个直接就写成一个data stream对吧,就当成一个data stream就好了,我把这个还是前面的那个测试都都放在上边,然后把这个放在下边啊,大家看这个代码会稍微的整齐一点,然后最后我可以把它做一个输出data stream做一个,呃,这个不是data stream,应该输出的是result对吧?啊,输出这样一个结果,好,那么我们可以运行一下,大家看看这个最终统计的结果是什么。
15:13
诶,大家看到这里边,我们为什么没有已经结束了,为什么没有任何的输出呢。这里面为什么没有任何输出啊哦对,大家想到这里面我设置的是15秒的滚动窗口对吧?那那你说我我刚才这个整个读取数据,然后运行这个跑这个代码,是不是根本要不了15秒直接全结束了呀,所以你这里边如果测的话,是不是什么结果都看不到对吧?所以大家想到那如果我要真的要测这个结果的话,应该怎么做啊对,大家想这个其实是。我是不是应该要去做一个流逝数据的输入,一个一个输入,然后等一段时间应该就能看到那个结果啊,要不然的话,你这里边直接,因为你看我读取数据,这个是文件读取数据的话,它读完了之后,它默认这是个有界流,对吧,读完了是不是直接退出了,不会继续等了,那当然我们这个就窗口就等不到它结束了,对吧,没有任何的输出,所以这里面我们把这个换成socket文本流对吧?换成soet文本流,那这里面env socket text file,呃,Text stream local host 7777。
16:26
好,我们把这个叫做呃,这个叫做input stream,对吧。Input试卷。这就是我们,呃,要测试的时候需要注意的其他一些这个要点啊,需要把这个稍微的改一下,那接下来我们把这个还是启动一下,试一下7777,接下来我们运行这个代码。啊,大家看到这个这个过程里边,其实我们最后统计输出的啊,这个结果其实就是当前的一个一个数而已,对不对,对吧,就只是当前的这个计数而已啊。
17:09
好,那现在已经启动起来,那我接下来就不是哈,我们用这个三四的数据啊。做一个输入测试。来一个啊,然后大家知道这个,如果你要输入的话,那得那得等这个15秒对不对啊,大家看一下,我再来一个346的啊,再来一个三,诶大家看为什么我这里面只输出两个一呢。为什么两个一啊?需要为什么两个一一和对,是不是一和六,我是K外之后按照不同的ID去做分组了呀,那每个组里边开一个窗口,是不是都应该有它对应的一个统计结果啊,哎,所以大家看这就是一个完整的过程,对吧,每个分组统计自己的那个聚合结果,所以是不是都是一啊啊所以接下来比方说我来一个这个,诶这个弄错了啊,这个这个。
18:08
变成一的话,这个它解析不了,肯定就报错了,对吧,接下来我们输出输入一个这个三七。然后30。呃,另外还有这个三一。341。好,我们看一下诶,大家看哦,刚才我那个30已经点快了,点了两个对吧,所以大家看现在统计出来是131,为什么131啊。是不是啊,347是不是有一个对吧,那三三这个341有三个,30有一个,当然这个顺序没准对吧,我们其实也不知道第一个到底是37还是三点十啊啊所以但是至少我们能看到是不是每一个对应的ID都能统统计出它当前的这个个数啊。
19:03
在之前的基础上一个一个累加对吧?诶,这就是我们当前这个做这个增量聚合啊,统计的这个过程,它是每来一个数,你看我们这里边定义的这个过程,是不是每来一个数就要ccumul加一啊增量聚合就是这样的一个效果。
我来说两句