00:00
我们已经知道了,窗口函数里边主要可以分成两大类,增量聚合函数和全窗口函数啊,那简单来说的话,其实就是全窗口函数,它使用的这个思路呢,就是收集齐了再处理,这是一个批处理的思路,而相对比之下,增量聚合函数,这真的就是我们的流处理的思路啊,那就是来一个就处理一个增量化的去进行聚合,诶那我们现在要做的是一个窗口计算啊,那最后呢,窗口应该是到点的时候才能输出一个结果,诶,所以呢,我们就是来一个数据,就直接做一个聚合,做一个计算,先保存起来。但是不输出结果,一直等到所有数据都处理完毕之后,等到窗口的结束时间到了,这个时候再统一把我们聚合的结果拿出来做一个输出,这就是增量聚合函数的底层逻辑,哎,所以我们会发现啊,这个增量聚合函数很明显可以提高程序运行的效率和实时性,速度就会更快,所以我们在实际应用过程当中,一般啊做聚合窗口聚合的话,都要使用增量聚合函数啊,那典型的增量聚合函数呢,又有两个,一个叫做reduce方式,另外一个叫做aggregate方式。
01:18
啊,那接下来我们就分别来进行讲解,首先我们来讲解reduce方式,那就是规约函数,其实我们知道规约就是最基本的聚合方式嘛,前面我们讲基本API的时候也提到了规约聚合。窗口的规约聚合呢,跟前面我们讲的非常的类似,也是窗口里边不是把要收集的数据都一个一个来了吗?那就是两两进行规约,每来一个就跟之前的规约结果做一个叠加聚合,哎,那这样的话最终就会规约成唯一的一个值进行输出,这就是我们所谓的规约函数要做的事情。那窗口函数里边呢,给我们提供了一个reduce function,这个reduce function啊,其实就跟我们之前在简单聚合操作里边实现的那个reduce function是完全一样的,它是同一个函数列接口啊,所以这里面我们就把它看成一个有窗口的reduce聚合就可以了,其实是完全一样的。所以接下来呢,我们在代码当中可以来做一个具体的测试,做一个实现,比如这里我们考虑一个简单的需求,诶,那就是类似于what com啊,我们统计一下某一个时间窗口内,比方说现在这个是一小。
02:33
小时的时间窗口啊,这个稍微时间有点太长了,因为我们等不了那么久,那么我们测试的话,干脆把它调短一点,我们就直接设一个second啊,我们来设置一个十秒钟的一个窗口吧,或者我们更短一点,我们设一个五秒钟一个滚动窗口,每五秒钟统计一次数据,那统计什么呢?就统计当前每个用户在当前五秒钟之内有多少次点击事件,哎,那所以接下来我们要做的这个操作。
03:03
是统计每个用户的点击频次,哎,那这个的话我们还应该像word count一样啊,前面做一个简单的转换,哎,那在K之前我们就先转换一下。首先我们把它做一个map转换吧。本来是一的数据,我们现在呢,其实只要user,然后另外再加上一个一就可以了,就每来一个点击事件,后面我们加一嘛,跟work com一样,所以我们包装成二元组,每一个data来了之后,我们直接返回当前data的user,以及一,直接把它包装成这样的二元组就可以了,那当然后面KBY的时候就不再有user了,那怎么办呢?哎,KBY_一提取当前的第一个字段user作为当前分组的键值就可以了。后面呢,哎,我们定义了一个基于事件时间的滚动窗口,五秒钟一个啊,这里少了一个括号啊,我们把这个括号加上,那下面应该这里也少一个括号,我们都加上,然后接下来就可以直接调用一个啊,当然了,我们有那个简单的句合方法啊,我直接调some其实也是可以的,针对后边的第二个字段直接做一个sum完全可以,我们现在要讲这个reduce的话,那我们就调一个reduce方法啊,哎,那对应的里边我们看到啊,这个reduce必须要传入的。
04:21
这里是一个reduce function,那这个reduce function点进去呢,这就是跟我们之前讲的那个基本API里边的reduce聚合要实现的那个函数类是完全一样的嘛,这个接口里边要实现一个reduce方法啊,我们把传入进来的两个数据两两进行规约,得到一个T类型的数据返回,好,那所以这里边跟之前的逻辑完全一致,那就直接去new一个reduce function,或者我们看到这里啊,也可以不写这个reduce function的实现类,直接传入一个拉姆达表达式就可以了啊,因为我们当前是这个单一抽象方法的接口吗?使用拉姆达表达式可以做更加简洁的实现啊,所以我们这里就非常熟悉了啊,你写拉姆达表达式吧,只不过现在有两个参数,那我们知道第一个参数其实代表的是当前的。
05:14
一个聚合状态啊,所以我们可以把它叫做state,那另外一个呢,是当前的一个新到来的数据,我们可以把它叫做贝,然后接下来我们的聚合结果啊,那其实贝塔和贝塔当前已经按照第一个字段进行分组了嘛,那他们的第一个字段其实都是一样的,所以我们可以使用贝塔的下划线一。然后聚合的计数结果,那就应该是state,点下划线二,再去加上一啊,那或者是加上我们当前data本身带着的它当前的这个个数,哎,那我们可以加上data点下划线二,这就是我们聚合的逻辑,之所以前面要加上user,那是因为我们当前这个reduce方式里边。它的返回的类型是不变的,必须保持二元组的类型啊,那所以接下来我们得到这个之后,这就是一个reduce,之后返回的就变成了data string t啊,那当然就还是一个二元组的数据类型了,接下来我们就可以直接把它做一个打印输出。
06:18
运行一下,现在我们就可以做一个测试了,好直接运行起来。我们可以看一看得到的结果是不是五秒钟就会统计出对应的用户,诶我们看到啊,第一个五秒钟只有爱丽丝,那就是一爱丽丝有一次点击后面呢,诶,后面五秒钟就有三个用户了,爱丽丝2BOB2 carry1,然后接下来我们看每一次的输出都是五条数据,这个也很好理解,因为我们是。一秒钟生成一个数据嘛,诶,那五秒的窗口当然是五条数据了,诶这五条数据怎么分配,那就呃各自有各自的这个情况了啊,那第一次的数据少一点,那是因为我们当前这个时间有可能刚好只输出了一个数据,就到了整五秒钟的那个窗口结束时间,那所以就会输出的比较快一点啊,那后面我们可以看到每一次都有五条数据,统计的就是这五秒钟之内所有用户他们的点击URL的访问频次啊,这就是reduce的一个简单介绍,非常的简单,跟之前是几乎一样的。
我来说两句