00:00
前面我们讲过了,呃,针对这个APP市场推广呢,做了一个分渠道的统计,那有些时候呢,我们可能还是想要汇总起来一个总量的,所以就是说在这个分渠道统计的基础上,我们可能还要再做一个不分渠道总量的统计,那这个统计过程就非常简单了,一种一种简单的想法是,前面我们不是已经做了分渠道统计吗?那大家想我在之前的这个代码基础上,是不是可以后边再做一个当前窗口内所有数据的累加呀?啊,那就相当于前面我们的这个K,不是也相当于给我们做了一个平均的分布吗?啊,那就不用再做单独的设计K了,对吧,直接分配开,然后收集齐的数据,我们后边再比方说等一毫秒,等到当前窗口数据全部都到齐,所有的累加起来,这不就是当前这个啊,全量的这个数据吗?啊,那当然或者还可以怎么样。就直接用我们之前的那个,呃,就相当于word count那种方式,直接把它全所有的数据发送到同一个。呃,以以同一个字段作为同一个P,然后发送到同一个分区,直接计算出来,对吧?啊,所以这个方法还是非常简单的,这里边快速给大家做一个实现。
01:10
我们就直接在这儿写一个。刚才这个叫做APP marketing。呃,这个叫呃不是by channel了,那我们直接叫统计吧,Statistics做一个统计,全量统计啊,那前面的整体流程大家想到这个基本上差差不多对吧,而且我们这里边也没有数据的话,直接从这个测试数据源里边去做一个,呃实现啊,我这里边直接把这个都copy过来。然后我们这里边用到的就是这个测试数据源啊,直接就用这个APP marketing by channel里边定义好的这个直接把它读进来,然后分配时间抽和water mark,接下来就不是分渠道了,对吧,我们直接就是开窗统计总量,好那这里边我们再简单的回忆一下之前那个word count的那种写法的话,大家总结一下相当于应该做一个什么事啊。
02:08
是不是filter之后,接下来我要做一个做一个map操作啊,对吧?哎,如果说我最后不想再去汇总的话,我是不是直接可以你有一个map方式啊,当然大家也可以直接写那个,呃,写那个拉达表达式,只不过最后如果是你你是一个元组类型的话,是不是还得专门指定它的那个返回类型啊,对吧,这里边我们就用这个慢方式啊,我想要的是一个二元组。还记得我们这个,诶,这这里边不要用那个skyla的那个二元组啊,刚才大家看到。刚刚才那个引入的是skyla,这里边我们用的是Java t下边的这个二元组,然后里边我们应该用到的是当前,呃,就是string类型,呃,直接给一个K,另外是不是给一个长整型的count数量啊,我们这里边包装其实就是就是直接就是一个K,一个通用的K,然后一个一就完事了,对吧?啊,所以这里边直接就是这么去写,后边呢,必须要实现的方法是。
03:12
一个map方法里边直接去return一个,你有一个二元组啊,里边是不是直接给一个,比方说当前我直接就叫做total对吧。后边来一个EL,这是不是就完事了,这就是我们map的过程对吧?那接下来分组的时候按照什么分组,是不是直接KY0啊对吧?呃,后边然后我们直接做一个这个,呃,开一个滑动窗口,那后边做这个aggregate的时候,那其实大家想到这个就是我们肯定不是用之前的这个方式了,对吧?那这里边我们就是相当于是这里边每来一个数据,直接去做一个统计,甚至我们还可以是不是直接做一个sum啊对sum的时候我就直接是不是用,哎这里边大家看到就是假如说啊,前边这里边我们定义了这个。
04:03
输出的这个数据类型是channel promotion count的话,那我们这里面就不能简单的做一个sum,因为大家知道sum得到的数据类型应该是什么。数据类型不变,是不是还应该是二元组类型啊啊,当然如果大家要求没那么高的话,那我也可以直接把这儿改成二元组不就完了吗?我们现在那个二元组输出不就是string string浪对吧。哎,把这个一改过来,这个类型就就不变了啊,那如果说大家需要比方说这里边我还是想要得到对应的这样的一个channel promotion count的话,那就老老实实的。把这个地方是不是还得做一个包装啊,对吧,那我们就还是用这个aggregate对吧,我有一个marketing。呃,比方说我这个叫statistics APP,然后后边因为有那个window信息嘛,所以还是得做一个全窗口函数,对吧,Marketing statistics result。
05:04
哎,这就是这个过程啊啊,所以下边这个流程也是非常的简单快速的实现一下class,这里边首先实现的是一个。啊,就是当前我这个implementment是一个aggregate function对吧?里边的类型是什么?当前的类型是不是应该是那个二元组类型啊,因为当前输入数据已变成二元组了,对吧?哎,所以这里边我要的是那个string和长整型的输入,那得到的中间状态和那个得到的结果是不是都是二元组啊,呃,都都是长整形啊对吧?这里边我们只要一个count的数量嘛,所以create的时候还是0L。ADD的时候accumulator对吧,Accumulator加一来一个就加一,最后得到结果accumulator啊,那么末的时候A加B非常简单对吧?啊,然后另外我们后边还要实现这样一个全窗口函数。
06:02
Public static class啊,那比方说现在我们不去实现那个process window function了,我这里直接就implement一个window function是不是也可以啊?这两这两种方式都是完全可以的,对吧,完全可行的啊啊,那大家知道这里我是不是得到的数据类型就是一个长整型对吧?啊,然后需要输出的是一个channel promotion count,那么当前的K类型又是什么呢?当前K的类型是不是这里KBY0当前是不是还是一个元组类型啊啊,所以它是个一元组对吧?所以这里面还是一样啊temple呃,注意这里边我就直接用这个temple放在这儿,对吧?然后time window啊time window这是window类型,然后接下来必须要实现的是变成了一个apply方法啊,大家还记得这里边我们想要的那个东西。
07:00
呃,其实前面的那个channel和和那个behavior是不是就都已经没有了,我们是全量统计嘛,所以这里面关心的其实就是一个window end,那当前的这个window and怎么给呢?因为是一个string,所以我们转换成一个time stamp,对吧?把这个还是引入,然后里边,哎,是不是直接就用当前的这个window.get hand就可以了,对吧,这本来就是一个长整形嘛,我们把它转成一个time Sam,然后再to string就可以拿到那个年月日十分秒了啊,那最后我们这个count数量又是什么呢?现在是不是在这个interable类型的input里边啊,之前那个叫elements对吧,现在这个叫input啊,其实都一样,只有一个数嘛,Next拿出来就完事了。最后我们输出是不是还是out.collect你有一个channel promotion count啊,里边当前前两个字段都都是都没有了啊,因为我们是统计全量的,所以我就直接叫total,后边那就是window and,再加上count,是不是就是这样的一个定义啊,哎,所以整体来讲这个过程还是完全类似的啊,非常简单快速的给大家再复习一下,大家这也可以也可以看到就是window function和process window function。
08:20
大家看有什么区别啊。基本上都一样对吧?都是全窗口函数对不对?区别就在于它既然是process window度function式,那是不是它其实是属于process process function加速里面的,所以大家看它是extend对吧?因为它本来也是负函数对不对?所以在里边你如果要想实现对应的这些open close生命周期是不是这里边都有啊,但如果说在这个window function里边,你如果想想要去实现,是不是就没有生命周期方法呀?哎,这个大家一目了然对吧?另外还有就是这里边的参数区别是不是在于这里边是一个上下文,而这里边是一个window啊,这个上下文是不是相当于对window是一个覆全覆盖对吧?它里边本来就有window信息,另外是不是还有一些其他信息还可以获取状态,可以获取当前的时间对吧?啊,对应的那些操作也都能做啊,这就是他俩的区别,大家可以再总结一下啊,这是关于全量数据,不分渠道的市场推广统计。
我来说两句