00:00
我们在用全窗口函数process window function做这个UV计算的时候,很明显它不如aggregate好使,看一看我们之前那个aggregate啊,啊,当然这个aggregate我们还做了那个PV嘛,它俩还要除,所以稍微的麻烦了一点,这里边我们用到这个中间状态是一个哈希set加一个长整形,那其实大家想到这个长整形就是专门为了那个统计PV而专门设置的嘛,那我们现在如果只统计UV的话,是不是就不需要它只要一个哈希set就可以了呢?哎,那就不用二元组了,对吧?直接把状态定义成一个哈希set,然后就是一开始new一个出来,然后呢,每来一个元素就把user at到当前的累加器里边来,Accumulate里边来,然后最后呢,直接返回它的这个size不就完事了吗?哎,所以非常简单,如果用这个aggregate方式去做这个uva统计的话,要比我们的全传函数明显要简单,而且要高效很多。但是大家发现。他们各有优缺点啊,就是你这个average function确实是更好使,更高效,增量聚合,更速度更快,延迟小,但是呢,它包装不上后面我们想要的这一个窗口信息啊,我们很想要对应的这个窗口信息,对不对?那现在这个全窗口函数,它能拿到对应的这个窗口信息,但是呢,它是把所有的数据全攒起来,做了一个批处理,这个又效率太低了,延迟太高了,我们当年这个场景明显是可以增量聚合的。
01:27
那怎么办呢?啊,弗林给我们提供的解决方案,我们在调用的时候可以把这两者结合起来用。啊,这个结合呢,在前面给大家讲到这个agggate function的时候,在f window stream给我们提供的agggate方法里边,它可以有各种各样不同的传参方式,最简单的当然就是只传一个L的方式啊,就把这个传进来,然后还有的就是呃,你要去指定ACC和这个输出的这个type information,这主要就是防止这个出现泛型擦除,然后你看不到它内部的结构了,防止出现这种状况啊,如果出现那种嵌套的情况的话,呃,可以用这种方式去指定,强行指定,然后另外呢,诶,我们看到可以追加参数。
02:12
加参数呢。可以后面跟一个process window function,或者跟一个window function,当然这个window function我们就不太在意了,完全可以用process window function覆盖嘛,所以那我们就想到了,这看起来是可以aggregate的时候,既传一个aggregate function也传一个。全串口函数啊,Process window度function或者运度function,那它的用法又是什么意思呢?哎,这个用法就是我还是既然有aggregate function吧,还是增量聚合。来一个就处理一次,来一个就处理一次,调用的就是aggregate function这里的。这里的A方法每来一个数据调一次,初始的时候创建create accumulator,创建当前的累加器,然后每来一次一个数据调,用I的方法去做叠加计算,增量聚合,那等到窗口结束的时候又该怎么办呢?
03:05
等到窗口结束的时候调get result方法,但是注意不是直接输出调get result方法之后把这个get result传递给后边的全窗口函数,传递给他的谁呢?传递给他的process方法里边,作为elements传递进来。啊,那其实我们就知道了,当前的这个element,其实里边就一个数,就是之前增量聚合的那个结果放到这儿来了。然后到了这个全窗口函数里边,那当然你想干什么干什么了,这里的K还有对应的这个上下文里边的窗口信息,呃,水位线,时间的信息,状态的信息,什么都能都能够拿到,你想包装什么包装什么,这样的话就把两者结合起来,优势优点都放在一起。啊,然后就这就变成了一个通用的最强大的用法。
04:00
好,呃,接下来我们就用这种方式把这个代码做一个具体的改进啊。我们前面说这个单独用全程函数使用起来的话,最后的结果不是很好,那我们接下来就把它做一个更新啊,那这个不是aggregate,也不是单纯的process window function,那我们干脆就把它叫成一个UV count example吧,这是我们真正在实际的使用啊,项目当中可能会用到的一种用法啊,下来我们还是既然是改进,那前面肯定就差不多了。把这个都copy过来。整的这个流程跟前边的这个process window方式的处理都是差不多的,不同的地方就在于这里边我们不再用process。Window function直接去做计算了,哎,那我们这里边是使用aggregate function和access window function。
05:03
结合计算UV,哎,那这里边的这个计算过程要调用的是一个aggate的方法。里边要传的呢,两个参数啊,那这里边我们就单独的定义一下啊,我们都在下面定义吧,让大家看得更清楚一点,要不然直接写在这儿的话,代码可能会显得比较乱一些啊啊,那所以接下来我们这个首先一个的方式啊,自定义。直线agg方式。主要是用来干什么呢?那就是增量聚合了。增量聚合计算就是计算UV值对吧,UV值其实就直接算出来了啊后其实我们那个再次方程主要就是要包装。窗口信息而已。所以我们。自定义实现。Process window function主要就是包装窗口信息输出。
06:05
啊,这是,呃,其实在实际使用的过程当中,经常会这么去干啊,然后接下来我们就看啊,这里要去做这个aggregate function。Class。我们把这个给一个名,这个叫什么吧,我们就叫UV的一个计算对吧,UV的聚合我们叫UVAJ吧。B。Aggregate function是一个接口,所以是implement aggregate function。然后三个泛型,还记得吧,Input,然后中间聚合状态ACC output,当前的input是。原始的数据空间的聚合状态,哎,这个聚合状态我们都已经知道了,不是要那个哈希set吗?目前刚好就用这种方式再给大家实现一遍哈希set string。然后最后输出的是什么呢?输出就一个UV值,哎,那哪那么麻烦呀,直接输出一个形或者长整形不就完了吗?直接放在这儿,然后接下来。
07:08
直线对应的四个方法。创建一个accumulator的时候,那就直接用一个哈希set就完事了嘛,然后ADD的时候,之前我们也说了,那不就每来一个把它的user添加到当前的状态里边,Accumulator里边就完事了吗?value.user添加进去,这里边不返回状态的话,那是不是我们这里边就相当于没累加呀?对吧,每次来的时候只是做了一个这个更新,然后不返回的话,那相当于每次我们的状态都是那啊那没用对吧,所以这个大家一定要注意一下啊,把这个添加进去。最后get result的时候,直接返回当前cuumul.size不就完事了吗?那当然了,现在既然我们定义成了长整形,那就那就再从类型转换一下,对吧,把它换成这个长整形,那最后还有这个墨纸啊,墨就无所谓了啊,你这个如果说想要去合并的话,那就把它做一个合并,我们这里边又不是会画窗口,所以说没有必要再去做墨。
08:07
就是非踌速的就能实现能量聚合的过程,但是里边没有窗口信息啊,哎,那这里边来一个。自定义的原窗口函数。Public static bus。UV。Count,我们来result吧,是包装一个结果b run函数process function是一个抽象类,所以是extends in function现在的输入类型是什么呢?之前是event,现在还是吗?现在不是了,现在它的输入就是这里get result的输出,前面增量均函数的输出,所以他拿到的其实就是我们统计出来的那个UV值嘛,庞整型啊,同样的我们包装成一个string,然后另外就是当前的那个K的类型是booing,对吧,然后风口的类型是time window。
09:05
整个的这个过程跟前面还是差不多的,所以这里我可以直接把这个直接copy过来就完事了,就最后这一句啊,主要就是这个。咱把这个process方法先调出来,然后最后其实就是要把这个UV填进去,那这里边取这个窗口信息还是这样去取,那UV应该是什么呢。啊,这里我们其实知道啊,OV不就是。啊,这个这个干脆就是长整型了嘛,前面我们统计本来就是长整型,那就浪吧。其实就是element limits里边把它拿出来就这一个数,因为它是个able啊,那所以也不用什么for each next,直接取一个不就是它吗,拿出来就是它可以直接输出了。就是我们最终实现的这个过程。这里边new一个定义好的UV,后边跟着一个我们定义好的UV count result做一个结合。
10:01
来运行一下。好,大家看到现在已经输入对应的这个数据了啊,我们看第一个十秒有多少条数据,诶大家看现在会有一个输出对吧,这个超到了50秒了,这里输出了一个UV值是四这里窗口。51分40秒到51分50秒,UV值为四啊,那当然这就是因为四个用户都有了啊,这个统计就全是四,如果想测试的输出不一样的值的话,可以随机的那个用户再给的多一点啊,那可能这里边就会出现各种各样不一样的值,因为这个十个数要选随机选四个的话,可能基本上都能选出来,所以这里面就全是四了啊。啊,这就是关于我们这个增量聚合函数和全窗口函数结合在一起进行一个优化的一个过程,这样的话代码的效率会更高,而且能实现我们的功能。
我来说两句