00:00
要讲的就是增量聚合函数里边最简单最基本的一类啊,那就是规约函数,其实在规约函数之外,还有比它更简单更基本的,这我们可以直接在这个代码里边直接看的更清楚,呃,我们在这个window test里边之前不是已经。定义了这样的一个窗口吗?滚动时间窗口啊,当然这里边这个HOURS1可能太大了一点,这里边可以稍微小一点啊,比方说给一个十秒钟的窗口吧,SECONDS10后续我们测试会方便一些,然后接下来可以调用的方法,里边有aggregate,有apply。然后还有,你看还有max max by mean mean by啊,当然还有process啊,这个我们后面再说还有reduce啊,所以其实就跟击败之后直接做聚合的那些方法是类似的,也有基本的简单聚合方法啊,就是max mean,也有sum对吧,大家看也有some,也可以直接去聚合,当然了这种聚合的话就是能定义的东西就会少一点啊,用法很像,我们就不专门讲了,这里就给大家说一下reduce吧。
01:11
Reduce。点进去之后会看到传的还是一个reduce function,那这个reduce function跟我们之前啊,来看这个reduce function,阿帕奇link API common functions,这个reduce function跟我们之前讲到。K之后做聚合的时候。在这里啊,Reduce这两个有什么区别呢?可以看到啊,两个就是同一个reduce function啊,所以这两个完全一样,那之前我们讲到的那些东西就都可以反过头来用到这里了啊,这里面其实要实现的就是一个reduce function,然后当前的数据类型是本,然后接下来我们可以对它做一些统计,做一些处理。那当然了,这里我们可以还是举一个之前的例子啊,比方说之前我们曾经在KY之后reduce的时候,是把它map成了一个二元组类型,然后去统计当前访问频次最大的这个用户啊,所以接下来我们同样可以把这个每个用户的访问频次呢,限定在一个窗口范围内,因为之前大家还记得这个数据啊,它是每来一个就统计一次,每来一个就统计一次,无休无止永远在统计啊,那如果是无限流的话,这个可能要不停的更新啊,最后就只有一个值,那我们现在可能就是希望,比方说我就统计每十秒钟统计一次,过去的十秒内每个用户到底访问了几次啊,然后每隔十秒统计一次,这样的话可能看的会更加的清晰一点。
02:45
所以对于这样一个需求,我们这里的reduce function怎么样去实现呢?啊,那当然前面这个啊,前面就不光是这个要做这样的一个reduce了啊,我们应该在前面先把stream。做一个map。
03:01
呃,还是把它要去new一个map function,因为我们知道如果要是二元组的话。解这个拉格表达式也可以,但是涉及到这个最后还要returns的那个过程啊,大家如果不习惯的话,我们就还是直接用这个my function来实现就可以了,既然是统计个数嘛,一个string,一个long,就像word count一样啊,再加一个一不就完事了吗?好,里边实现一个map方法,Return一个2.2。当前value的user,然后EL。那就搞定了,那后续。里的KBY就不再是user了,因为只有二元组里面没有user字段了。我们现在是第一个元素F0,然后接下来的这个reduce function,当然这个类型也就不对了,我们还让它自动的识别一下reduce function里边传入的数据类型是。二元组,然后接下来呢,就是要定义一个规约方法,我们当前的所有数据,到底以什么样的形式把一和二两个数据归约在一起呢?那我们自然就想到了,最最后归约在一起还得得到一个二元组负二啊,前面的这个user user,那肯定还是一个user嘛,就是以这个当前的这个user字段做KY的,所以是value一点F0先放在这儿,然后二个字段呢,那规约的时候其实就是不停的叠加嘛。
04:30
后面都是本来都是一,那然后就一个一个累加就完事了,Value一点F1加上VALUE2点F1。就是我们经济这一个处理的过程,好,然后整完了之后,我们直接把它做一个打印输出吧。得到的是一个single output dream operator,这样的话就完全没有问题了,那接下来我们来运行一下看看这个,诶,我们看一下上面的圆用的是啊,这里是我们自定义的一个圆,当然了,这里边如果是十秒窗口的话,所有的数据应该都在这一个十秒窗口内,我们先看一下吧。
05:10
之前我们如果不加开窗直接reduce的话,得到的其实应该是来一条就输出一次,来一条就输出一次,不停的更新,诶那你能不能直接就把它那个最后只输出一个最终结果呢?诶当时我们说那就相当于批处理了,你可以在前边直接把执行的模式定义成batch啊,定义成批处理模式,那另外还有一种方式呢,如果说你对它的这个截取时间范围有限制,那其实做一个窗口输出是不是也一样的呀?啊,在这里我们直接定义一个零到十秒钟的窗口,那其实就包含了这里的所有数据,在这里边统计出来,Mary有一次点击Alice丝比较多,有三次,Bob最多有五次,我们的一共九条数据,分别就是这样的一个统计结果,诶,这就是窗口,很容易看到它的这个展示的结果,而且我们想要的可能就是这样的一个结果。
06:03
但是毕竟这是一个有界流,直接就是写死在这里边,一个form elements把所有数据读进来了,那实际使用的时候,可能大家会更习惯于用一个真正意义上的流数据来表示。啊,我们把这个函数直接写一下啊,留数据event stream。那留数据的话,我们可以一条一条输入,OK,是那个太麻烦了啊,数据太多了,我还记得之前我们曾经实现过一个lius自定义的语言,那现在刚好可以直接用起来,你有一个click s添加进来啊,当然这里用到了这个乱序流,其实没必要,S里边我们就是按照当前的时间顺序打的时间戳嘛,这个肯定不会变,对吧?啊,所以这里边这个延迟其实我们现在就会发现没什么真正意义上的作用。单纯的只是让它滞后了而已啊,所以我们可以把它改成有序留的那个处理啊,就是for mononess times,然后就同样的这个实现就可以了啊。当然另外还有一种方式,如果大家不想改这个的话,之前我们说过,其实底层不都是这个东西吗?你也可以直接用for bed out of orderness啊,这个处理乱序数据的这个方法里边传一个什么呢?Duration,哎,其实大家之前也看到过啊。
07:21
Zero就给一个零不就完了吗?就相当于跟我升序有序的流的处理是一样的,然后接下来后边啊,这个处理就完全一样了,完全一致,直接到最后我们做一个打印输出,现在大家看一下啊,这个就不会直接结束退出了,它就会一直有数据。诶,大家看上来之后只有一个爱丽丝一,这是为什么呢?很显然这应该就是我们的第一个十秒钟,只有爱丽丝一个数据对吧?好,然后第二个十秒钟大家看。哎,这一共有十个数据,因为之前我们不是隔一秒产生随机生成一个数据吗?啊,所以第二个十秒钟爱ice丝有四个,然后Bob Mary carry瑞各是二,然后第三个十秒钟随机生成的carry有三个,Bob有五个,Mary有两个,这个就看的非常明显啊,每十秒钟这里会弹出一个结果啊,看到最后我们这个统计出来的这个效果就是隔一段时间哎,输出一个结果,它统计的范围就是。
08:20
之前的十秒钟的这个范围啊,就是按照时间戳生成的时候。大家还记得之前我们那个k sau啊,直接就是按当前。我们现在的这个时间去获取的时间戳,那就跟现在具体的时间有关了,大家知道十秒钟一个窗口肯定是整十秒嘛,为什么一开始只有一个爱一呢?诶,那有可能刚好就在这个,比方说啊,就是59秒的时候啊,或者是19秒的时候只产生了一个数,然后就跳到下一下一个20秒了啊,那当然我们这里边第一个十秒输出的就就他一个了,那第二个十秒呢,那你是要等到30秒的时候才会去输出20秒到30秒之间的数据,所以你要等一会儿才能看到剩下的十个数据。
09:02
啊,这就是我们窗口reduce的一个测试。
我来说两句