00:00
好,那现在我们代码里边已经把这个自定义的测试数据源已经生成了,已经有了啊,那接下来我们继续做的操作肯定就是基于它是不是要去既然事件时间语义嘛,是不是要去分配时间戳口water mark,哎,所以接下来还是ign time stamp and water marks,那里边大家想这里边我们是处理乱序还是生序呢?因为当前我这个数据就是按照当前的系统时间直接生成的,对吧?然后每隔100毫秒生成一个,那这个绝对生序对吧?那这个测试就不需要再去做乱序的测试了,所以这里边我们直接引入一个a sending time stamp extraor,这里边提取的就是element.get time stamp要乘1000吗?不用,我们当前直接获取的就是毫秒数时间戳啊,所以直接放在这儿就就完事了啊,提取出来就完事了,这是我们第一步直接获取这个数据,然后接下来那就是诶,分渠道统计对吧?分渠道开窗统计。
01:06
比方说我们统计的还是一小时之内的所有的数据,诶那家想当前我这个呃,分渠道统计的话,那就是基于之前的data stream,然后首先是不是做一个KBY就可以了,然后另外大家想到我要统计的这个不同渠道里边,我是不是还想统计他这个不同的行为啊,呃,就之前我们讲到的所有的这些行为,我们看这个病里边对应的这个行为啊,这里边呃,大家看到这里边我们定义的这个行为在在前面那个数据源里面都有,对吧?这里边这个行为我们并不是所有的,是不是都属于市场推广的正面那个数据啊,有一些比方说像onin store,这这明显这是一个负面的数据嘛。哎,所以我比方说我们现在统计只是想看哪一个行为是当前就是。对这个推广正面影响比较大的,对吧?那我是不是首先应该把这个on in store滤掉啊,首先不要这个对吧,所以上边做一个过滤filter啊,那大家知道这个应该怎么写了,我们这里是不是直接就是当前的data,然后要求我是不是不能等于on on in store啊,所以是on in store equals,当前data的get behavior对吧?啊,就要求跟oninsor不相等的行为要把它过滤出来。
02:30
然后接下来filter出来之后,按照渠道去做一个开窗,哎,那大家想到这里边我就要的是这个channel嘛,但是大家想到我是不是还想统计不同渠道下,诶到底是点击了多少,然后安装了多少对吧,下载了多少都想分开统计,那这怎么办呢?对大家想这是不是就相当于我们之前说的组合K啊,组合键对吧?定义一个组合键就可以把当前的两个字段都作为K进行一个分组,那所以当前如果是按照这个当前的这个字段名称去给定的话,是不是可以给多个值啊,所以我前面给一个channel,后边再给一个behavior,这就是一个组合K,那大家知道当前的这个K的类型是什么?
03:19
肯定就是元组对不对,而且当前明显是一个二元组了,对吧?呃,String string类型的一个二元组,好,那接下来继续往后,既然已经做了KPI分组了,下面当然就应该开窗了,对吧?Time windowo啊,那这个窗口的话,我们已经说了要一个一小时的窗口啊,那另外呢,因为我们当前是测试数据源嘛,你如果隔一个小时然后才这个输出一次的话,这个滚动窗口的话,我们等半天等不到对吧?啊,所以我短一点啊,让它滑动一下,我给一个五秒钟的滑动步长,Seconds给一个五。哎,所以这里边我们是定义划窗。
04:02
呃,那接下来最后还有一步操作,当然就是要做一个聚合了,大家知道当前我是不是就应该是来一个countt加一,来一个count加一啊,那跟我们之前的那个聚合方式是不是还是一样啊,只要是增量聚合就可以了,对吧?哎,来一个countt加一,来一个countt加一,那另外我们最后想要的那个数据类型,我们包装的这个channel pro,呃,Pro promotion count里边不是还有window and吗?所以是不是还得包装到那个全窗口函数里边找那个window信息啊?哎,所以这又来了,还是流程,标准流程啊,前面一个相当于预聚合的增量聚合函数,对吧?我把这个叫做marketing count agg。然后后边是不是来一个全充分函数啊,当前的这个结果marketing count result,诶这就是整个做聚合的一个过程,当然前面我可以把它叫做result stream。
05:03
然后最后可以把这个result stream做一个打印输出,不要忘记还有execute执行起来,我们加上job name,当前叫做APP marketing市场推广,对吧,By marketing啊。Marketing对吧,By channel job,这就是整个这个处理流程,那接下来我们就把对应的这两个方法,呃,对应的这两个自定义的类函数类做一个实现就可以了。好,那接下来我们实现自定义的增量聚合函数。这个过程已经非常熟悉了吧,Public static class,然后接下来我们这个叫做marketing count a,这个要实现的是一个aggregate function aggregate function。
06:05
里面的类型回忆一下,当前是in ACC out,当前的输入是我们外边输入的那个po类型,对不对啊,Market marketing user behavior,然后接下来中间聚合的结果和输出是不是都是一个count值啊,所以都是长整型嘛,Count long long。然后接下来里边是不是要实现四个方法啊啊,接下来我们这里边create长整形0L ADD的时候accumulator加一对吧,来一个直接加1GET results的时候,Accumulator直接返回墨的时候A加B跟之前是不是一模一样啊,就是类型稍微做了一个调整对吧?所以这个非常简单啊,然后另外我们还有一个全窗口函数market count result,这里边做一个具体实现,实现自定义的全窗口函数public static class,好,然后这里我们要实现的接口是一个。
07:10
哎,这里大家注意一下啊,就是假如说我们前面,呃,就是这里边是定义了这个aggregate的话,这是可以给一个window function的,对吧,那另外。这里边假如说我们里边还想拿到更多的信息,拿到上下文信息的话,我给那个更复杂的process window function可以吗?大家看也是可以的,对吧?哎,所以这里边给大家多测一个另外的一个接口,我我这里边不给window方式了,我直接给一个process window方式,大家看也可以这么用,对吧。那这里边我直接在这写啊,Implementment,大家知道这个process window function的话,它其实本来应该是一个process function里边的这个家族,它都是reach function对不对啊,所以它是不是应该是extend啊,Extend一个process process window function。
08:02
然后里边的类型,哎,其实这个大家也知道,跟那个window function一样,是不是四个类型啊,In out kw,那么当前的输入in就是之前的输出长整型的那个count对吧?啊,那它的输出当然就是我们最终想要的那个结果,是不是channel promotion count啊啊,然后最后当前的K,哎,那是ta对吧?二元组,呃,然后接下来这个window类型是time window。好,大家看到这个一写的话,上面完全应该是不,诶这里边大家看到这里边有问题,是因为我们得到这个result应该它的类型是,呃,对,Channel promotion count对吧,这样的话就没问题了啊,所以最后我们要实现的其实就是一个是不是process方法啊呃,之前我们那个如果是window function的话,大家还记得实现的是一个apply方法对吧?现在要实现的是一个process方法,那这里面的区别就在于他拿到的是不是就不是window了,而是一个contact,那大家知道这个contact是不是比window拿到东西还多啊啊,所以这里边我们还是一个一个做一个提取啊,我们要的那个字段不是首先想要当前的channel吗?大家还记得这个channel promotion里边channel behavior,对吧?然后window and count啊,所以首先是channel channel怎么提取呢?
09:30
诶,那是不是就是在这个temple里边对吧?K里边吧,当前这个channel里边,它是不是应该第几个呢?哎,这个大家注意啊,有同学可能说之前你定义那个,呃,我们输入的这个marketing有behavior的时候,他在前面嘛,呃,这个behavior在前面嘛,所以channel应该排后边,Channel是第一,呃,Channel是一,Behavior是零,这个对吗?对,大家注意,这我们要找的得是KBY的时候,这里的顺序对不对,按照这个顺序,这才是第一个位置,第二个位置,所以我们channel是不是应该是零啊,哎,所以。
10:07
这里边啊,Get fill0,这就是当前的channel,那同样behavior就应该是temple get fill get get field1对吧?呃,那同样下边我们还要一个string类型的window and,这个应该从哪去找?当然是contact里边大家看是不是有window啊,然后get window and对吧?哎,当然本身这是一个长整型,我可以非常简单的把它转成一个时间戳,对不对?这个大家还记得吧?呃,你有一个time stamp啊,我把这个引入进来,直接用Java CQ就可以。大家还记得之前我们那个控制台打印输出的时候,是不是用过这种方式啊,直接把一个时间戳传给一个呃,New time Sam的时候直接传一个时间戳进来,这得到的就是一个time sammp,对吧?啊,后面我们想得到一个string类型的话,再把它to string是不是就可以了?哎,这就是拿到的那个年月日十分秒那样的一个格式啊,最后还有一个当前的count数量,这个是不是就是从后边大家看到这个elements里面去拿啊,之前那个window function里边这个是叫input,现在是叫elements了,其实都一样,大家知道全窗口函数吗?所有的数据是不是都收集到这儿了?现在我们的数据是不是所有数据就就一个啊,就前面的一个count统计出来的预聚合的结果吧,所以我们的操作是不是还是点next拿到就完事了。
11:41
就这么做对吧,最后再做一个包装输出,拿什么输出。哎,大家看这还一样吗?Out吗?out.collect然后里边你一个channel promotion count,里边就是channel behavior window and,然后count,对吧。
12:05
这就是我们统计的一个完整的流程,这就做完了,好,那接下来我们来运行测试一下啊,啊上面我们这个已经有打印输出了,对吧,测试一下看看效果怎么样。好,运行起来我们看一下结果,因为现在是一个流式的测试数据源,对吧,就是我们这里边源源不断产生数据,所以等一下的话,这里应该就是很快就会有数据输出,诶大家看。接下来多长时间输出一次呢?是不是五秒钟输出一次啊,大家看这个后边这个时间戳,我们是带着那个的,大家看已经转成这个年月日十分秒了,35秒对吧,隔五秒钟之后40秒,然后当前大家看到这个count值是不是不停在涨啊,因为我们统计的是一小时的时间窗口嘛,所以一开始我们数据还还比较少,对吧,所以接下来这个这个时间段肯定是不停在涨的,然后为什么一个窗口就会输出这么多条数据呢?
13:06
因为我们的key是不是组合key啊,对吧,你看这前面不同的这个channel,然后他的这个行为啊,Install click或者这个download,我们分别是做了一个统计的,所以对应的这个数据都有一个输出啊,这就是我们做这个模拟测试,按照不同的渠道做做这个呃,APP推广统计的一个结果。
我来说两句