00:00
我们已经了解了增量聚合函数中的reduce方啊,那我们可以知道它其实就是在当前的window stream基础上直接去调一个点,Reduce方法里边要传入的就是一个reduce function,或者说我们用这样的一个拉姆达表达式来对它进行实现,这种方式呢,其实非常的简单,也能应对我们实际生产当中的很多场景,但是呢,这个接口其实是有一个限制啊,那我们会发现啊,就是在这个reduce function当中,它只有一个泛型类型啊,就是当前的数据类型,它在规约的过程当中呢。所有的数据的类型是一样的,而且得到的结果规约之后的结果也是一样的,这也就意味着它中间聚合的结果类型也是一样的,中间不能变。我们说对于reduce而言,相当于中间是有一个聚合状态的保存的,所以它的状态类型和输入的数据类型以及最后的输出类型完全都一样。
01:03
这就是reduce的要求啊,所以在这里我们可以看到啊,如果说我们要对这样的一个二元组数据进行一个规约聚合的话,那么中间聚合的结果也是二元组,最后得到的输出也是二元组。那在有一些场景下,我们就会觉得这种就很不方便嘛,假如说我们需要有更多的处理转换,那有可能我们输出的结果就不想让它跟输入的类型一样,还有可能呢,我们是中间做聚合的时候,所需要保存的状态也有可能是不同的类型,哎,所以在这种场景下,当然我们用reduce也可以实现啊,比如说我这里有一个非常简单的例子,什么情况下就是它的类型有可能会不一样呢?诶,比如说我们可以去求一个平均数。Average,哎,那我们都知道啊,求平均数,假如说我们就求一个长整形数的平均数。
02:00
那很显然,我们当前输入的数据应该就是所有的长整型数啊,最多我们后边可能再加一个一,那这就是我们所说的一个长整型数,一个一,类似于word count一样,把它包装成二元组做一个输入,这是输入类型。那输出类型呢?它的平均数显然应该是一个double类型。所以它的输入输出是不一样的。当然了,这种需求我们完全可以用reduce直接进行一个实现啊,那这个实现的过程呢,可能要分成好几步来走,首先我们应该把数据转换成这样的二元组,一个长整型,一个一,然后接下来呢,我们就定义一个reduce function,分别对这两个字段进行规约聚合,哦,那就是前面的第一个长整形数全部叠加起来得到一个萨。后边所有的一也叠加起来,得到一个count。
03:01
得到了这两个数之后,最后呢,我们再用sum除以count得到这样一个double啊,那当然了,最后这一个相除的转换,那我们就需要再用一步map操作进行处理了。这个过程可以实现,但是就稍微的有一点麻烦。那有没有直接一步操作就能实现这样聚合需求的函数呢?当然是有的,那接下来我们要介绍的就是另外一类增量聚合函数。可以说这个也是更加一般化的一个增量聚合函数,那就叫做aggregate function啊,也就是字面上看的话,它就叫做聚合函数,它就是更加通用的聚合接口啊,那所以接下来我们在代码当中怎么样去使用呢?其实也非常简单,就是在这里我们基于一个window。在这里可以直接去调用一个aggregate方法啊,那么我们就看到了aggregate方法里边,它需要传入的就是一个aggregate方啊,我们可以点进去看到这又是一个接口,但是现在跟reduce function不同,它的泛型类型就有三个,这三个分别表示什么含义呢?
04:10
从它的定义上我们就能看得出来,第一个叫做印,那当然就是我们输入数据想要进行聚合的原始数据的数据类型。然后还有一个out,那很明显这就是输出的数据类型了。中间还有一个ACCACC是什么呢?这是accumulator的前三个字母缩写,所以这其实就是中间。要做聚合的一个累加状态啊,所以我们可以认为这就是我们中间保存的那个状态的类型。然后里边我们看到他也不是单一的抽象方法了,它一共有四个抽象方法需要我们去做实现。首先第一个叫做create accumulator啊,那字面上能够看得出来,这就是创建一个累加器,这相当于是我们初始化当前聚合状态的一个过程,它返回的就是中间的聚合状态ACC类型的一个值。
05:04
因为它是初始化累加器嘛,所以对于我们当前的一个聚合任务而言,每一次做操作的时候,做聚合的时候呢,这一个方法其实只会被调用一次,创建出来就可以了,相当于初始化,那么创建出来之后呢,接下来每来一条数据流失,数据到来,我们就可以触发一次计算,要在之前聚合的基础上做一个。叠加聚合,哎,那接下来调用的方法就是第二个A的方法,A方法我们可以看到它有两个参数。第一个参数是in,哎,那很显然这就是当前传进来的想要进行聚合统计的数据,而后边的ACC,那就是目前已经有的累加器状态,所以这个艾特方法里边,我们要定义的就是怎么样在之前已有的状态基础上把新到的数据叠加聚合进去啊,那最后呢,它返回的就是一个聚合之后的新的累加器状态,也就是我们当前的累加状态就更新了。
06:06
啊,那可能我们会想,那最后输出什么结果呢?我们想要的结果在哪输出呢?在这里get result这里输出。Get result里边只有一个参数,就是ACC当前的聚合状态,我们就可以基于当前已经完成的聚合状态得到一个输出的结果,啊,那它的返回就是out类型。所以我们可以看到就是前面的这个ADD方法,它是每来一条数据就会调用一次这里的这个方法,然后得到一个结果,更新当前的中间累加器的状态,那么什么时候调用get result呢?对于我们当前的窗口操作而言,那就是要去触发窗口计算的时候,哎,比如说我们收集八点到九点的数据,那其实就是一开始创建这个窗口的时候,那就要。先调用一下create accumulator,先创建一个累加器,然后这个窗口范围内每来一个数据就调一次A,然后最后呢,到九点钟了,我们发现这个窗口应该要关了,这个时候先触发窗口计算,那就是调用get result获取当前的计算结果输出之后这个窗口就可以关闭了。诶,这就是aggregate function进行窗口操作的完整流程。
07:22
啊,那最后还有一个方法叫做merge merge我们知道它的含义是合并,所以它的意思其实就是对于我们定义的场景,如果出现窗口要合并操作的时候,那我们就把两个窗口的累加器合在一起,合二为一啊,那这种方法什么时候要调用呢?呃,简单来说的话,一般情况我们调不到那出现窗口合并的情况,在之前我们提到的所有窗口类型里边,只有会话窗口session window有可能会出现这种情况,哎,那所以我们会发现就只有session window里边啊,有可能需要去实现这个墨别的窗口,其实这个不实现也是可以的。
08:03
所以接下来我们就可以在代码里边去做一个具体的测试了啊,为了跟之前的reduce function分开,我们另外新建一个object。我们要测试的是aggregate function,所以就叫做aggregate。Function test。没方法,先写出来,然后首先。还是基本的流程,需要去创建执行环境,读取数据源啊,另外我们这里边基于事件时间去做处理,后面可能涉及到开窗口,跟时间有关,那么我们要分配时间戳生成watermark,哎,所以前面的这些操作我们可以直接copy过来。上面该引入的影视转换,我们做一个下划线的引入,呃,那这里其实我们会看到啊,如果当前数据本身的时间戳就是升序的话,我们直接定义这个水位线的生成策略的时候,诶,这个调用其实是有点麻烦的啊,其实本质上我们这里边就是指定了一个time stemmp的提取策略而已啊,所以呢,其实在flink当中给我们提供了另外一种比较简单的简洁的调用方式,诶,那就是可以不用通用的as stems and autos,而是一个as sign asending steps。
09:16
好,那我们看到它其实所说的就是分配声序的时间戳,那这里边要传什么东西呢?诶,我们看它就直接传一个提取器,这个提取器呢,用一个拉姆DA表达式来表示就可以了,那这个提取器本质上来讲就是实现了我们extract a sending time stamp这样一个方法,那我们知道啊,这个方法在当前的窗口分配器里边,其实就对应着之前我们extract这样一个方法,哎,那所以我们在这里直接传一个拉姆达表达式就可以了啊,就像之前的话,我们就直接下划线,点time step把它提取出来,完全就搞定当前时间数的分配了啊,所以这样的话就会简单一点啊。有了stream之后,接下来我们就可以基于它去做k buy,然后开窗去进行聚合了,在这里我们去考察一个具体的应用的实例吧,我们知道在电商场景或者是很多的网站里边啊,都会有这样的一个统计指标,那就是PV和UV,这是非常经典的两个指标了啊,我们知道PV是page view,就是统计一个页面的浏览量、访问量啊,有时候呢,我们也可以用它来统计每个用户访问了多少次啊,那UV呢,那就是在PV的基础上对user再去做一个去重,哎,一般我们这个UV是unique visitor,就是独立访客数,就是看当前的网站,当前的页面到底有多少个用户访问过啊,那像PV的话呢,就是一个用户访问很多次,访问100次,那我们就会统计100个PV,而在UV里边呢,这100次如果是同一个用户做的点击,做的访问,我们就只统计一次UV,相当于是一个去重。其实我们之前在进行reduce function的。
10:59
试的过程当中,我们所定义的这个处理逻辑呢,就是针对每一个用户,每一个user统计了他访问URL的频次,啊,那这个概念有点类似于PV,但是又不太一样,我们知道PV的话,其实是要统计当前所有用户对于所有页面的访问的次数。
11:20
所以呢,接下来我们可以把这个需求做一个更改,当前所要统计的是统计PV和UV啊,我们可以直接把这两个统计出来,那另外呢,哎,我们干脆就只输出一个结果,我们输出什么呢?输出PV除以UV的值,哎,这表示什么呢?PV我们知道在这里统计的就是所有用户在当前网站上点击URL的频次啊,那所有的访问量都已经统计出来了,而UV呢,是独立的访客数,就是我们到底一共有多少个用户做了访问,做了点击,那这个一除就相当于得到的是一个平均每个用户对于当前网站的访问次数。
12:03
呃,这个指标在一定程度上就表示了我每个用户的活跃度,或者说每个用户的年度。那这样一个需求,因为我们输出的是PV除以UV的值,很显然这应该是一个double类型的输出结果,哦,那这个类型呢,跟我们前面统计的时候,有可能我们包装成二元组啊,也有可能是其他的一些形式,那跟我们统计时候,或者说输入的数据类型就会有所不同了,那这种方式呢,我们用reduce function可能要分成好几步,现在我们可以使用。Aggregate function,那只需要一步操作就直接搞定了,那就是STEM,我们先啊,如果要开窗,我们先得去做KBY啊,当然了,这里我们会发现其实要统计的是所有的数据啊啊,那统计所有数据的话,我们说可以不做分组,直接window all啊,那另外呢,还有一种方式就是我们分组也可以,那怎么分呢?那其实很简单,就是让所有的数据。
13:03
我们都返回同样的K就可以了啊,就这里边给一个当前的分组标志,就叫做K,或者说就叫做处。所有的数据来了之后,都是这样一个分组标准,那当然他们就是同一个K都会分到同一个组里边去做统计了啊,所以接下来呢,后边接着的就是一个window。现在我们指定一个窗口的分配器。我们前面测试了滚动窗口,哎,那接下来我们干脆测一个滑动窗口吧,比方说我们的要求就是十秒钟长度的滑动窗口,然后呢,每隔两秒统计一次啊,输出一次当前的这个聚合的结果,所以接下来我们就应该是sliding。Event,我们还是事件时间select even time Windows,然后点O里边给的就应该是time,这里需要引入对应的啊,我们这里是API window in time.time我们说十秒钟一个窗口,所以就是time second10。
14:02
另外呢,还有一个滑动不长,那就是time.seconds我们两秒钟滑动一次,那就是SECONDS2,有了窗口分配器之后,接下来呢,我们就要定义窗口函数了,现在我们直接使用一个aggregate方式来实现这个功能,所以就是点aggregate里边就要创建一个增量聚合函数啊,就是我们所说的方式,这里我们干脆在下边去做一个单独的自定义吧。实现自定义聚合函数。Plus,我们把这个叫做PVUV好了。Extends aggregate function。这里的泛型啊,我们先把aggregate function要引入泛型有三个,哎,那输入ACC,还有输出当前的输入,当然就是event类型了,然后我们接下来要进行一个聚合计算,那这里边的聚合计算其实主要有两个东西,一个是用来计算当前的PV。
15:03
那我们知道啊,其实就是每来一个数据,我们就加一就完了嘛,其实就是一个长整形的值,所以在这里我们可以。来一个长整形的值,然后另外还需要统计UVUV的话我们知道它是要做去重的,那这个怎么样去统计呢?怎么样判断之前这个U的出现过没有呢?啊,一个最简单的方式我们知道啊,可以用哈希set,就用一个set数据结构我们知道对于一个集合类型啊,Set集合类型而言,它的数据元素的特点是有唯一性的啊,就是假如说是重复的元素塞到一个set里边的话,它其实只算一次,诶,那所以我们就想到了直接用一个set来保存当前已经有过的用户,那最终set里边有多少个。我们的UV就是多少啊,所以基于这样的想法的话,我们可以设置这样的一个二元组作为中间的聚合状态,我们用一个二元组。也就是前面是一个长整形的值,后边是一个set。
16:04
来。表示中间。聚合的。PVUV状态。那在这里呢,我们的类型就直接指定成SC自带的set集合类型,然后里边的泛型我们就直接给一个string,因为要保存的是user吧,按照这个string类型的字段去进行一个去重保存,这就是我们定义的ACC的类型二元组,然后最后还有一个输出的数据类型,输出数据类型我们是要一除嘛,PV除以UV,所以这里边直接给一个double就可以了。然后我们看到里边必须要实现这样的四个抽象方法,首先是create accumulator,这是一个初始化的方法,初始化的时候我们其实要创建的就是一开始的这个二元组嘛,那初始的时候给什么呢?其实非常简单,这不就是。开始的时候PV是零,那就给一个0L,另外还应该有一个空的set,哎,那这个set的话,我们就直接给一个set string,然后创建一个空的对象就可以了。
17:11
然后接下来是爱的方法,爱的方法是每来一个元素,我们这里可以写一条注释。每来一条数据。都会。进行ADD。叠加聚合。所以接下来我们在这里处理的时候,那就是应是我们当前新到的数据,哎,那接下来我们这个聚合状态怎么改变呢?我们所要返回的就是一个改变之后的聚合状态嘛,那又是一个二元组,那首先当前的长整形的PV值来一条数据,那就加一嘛,哎,所以这个其实非常简单,我们直接就是ACC。点下划线一之前的PV值直接加一。那后边我们当前的这个set呢,那就更加简单了,直接就把我们user字段直接添加到ACC,点下划线二就可以了,哎,那我们知道这个对于SC而言,本身的set操作啊,也可以直接用加号,所以我们就直接加当前的in.user。
18:15
直接添加进来就完事,那最后的get result这里调用的时候就会返回我们最终的计算结果。返回最终计算结果。那这里需要得到的就是一个double类型的值了,其实就是两个数一除,我们是PV除以UV,所以就直接用这里的ACC。点下划线一除以ACC,点下划线二注意当前这个是一个set,我们要的是它的大小,它的size,只要拿出来这就是当前的UV值啊,那当然了,本身这两个一除得到的结果是长整性整数除法嘛,所以接下来如果我们要想要得到double的话,先得到长整形,然后再转成double,那后面就肯定是多少多少点零,这个可能不是我们要的啊,我们要的是真正除出来的那个小数,所以呢,可以在前面先做一个to double的一个转换,把前面我们统计出来的长整形数转换成double类型,再做相除的话,那就是一个带小数的除法了。
19:21
然后最后还有一个末日方法,我们知道滑动窗口并不涉及到窗口的合并,所以这个我们可以完全可以空在这不实现。所以实现了这个自定义的aggregate function之后,接下来在上边aggregate里边,我们就可以直接去拗一个PVUV,把自定义的这个类的对象传进来就可以了。好,那处理完之后,接下来我们可以直接做一个print打印输出,最后执行起来,这就是我们完整的处理流程。我们现在可以测试一下,运行起来,看看效果怎么样。运行起来。我们现在还是使用自定义的测试数据源click source,每一秒钟生成一条数据啊,现在我们看已经有输出了,那开始的输出呢,是1.0,然后我们看基本上会逐渐增大一点五啊,当然不一定啊,中间变成1.25,然后1.75 2.25,然后变成2.5啊,其实这个很好理解,因为我们这S里边我们所定义的user的选取范围就是四个用户嘛,所以我们想到啊,十秒钟每秒钟生成一条数据,那假如说四个用户都会出现的话,那我们知道了十秒钟之内的平均每个用户的点击次数,那就是2.5啊,那如果说我们当前随机生成的时候只有三个用户,少了一个的话,那就是3.33啊,最后一个五的话,这是因为我们这个double类型精度缺失的问题啊,所以这个看到的结果就一目了然,大部分情况我们还是四个用户都会随机生成有数据的啊,也就是UV大部分时候都是四。所以说我们。
20:58
里边很多情况都是2.5这样一个输出啊,这就是关于aggregate function增量聚合函数测试的过程。
我来说两句