00:00
我们现在已经有了自定义的数据源了,那接下来当然就是在这个main方法里边,把我们该做的事情一个一个还是做完,对吧?啊,那首先定义这个environment,对吧,Execution environment,先把它get,我们这里面还是不要忘记下划线引入,接下来我们还是就是整个的这个并行度先设程一,后边看起来这个测试的情况看起来就会简单一些,对吧?顺序是可以保证的。另外我们还需要去设置一下当前的诶时间语义啊,那当前一般情况大家可能想到还是要配这个事件时间语义对吧,Time characteristic,给一个这个even event time,其实我们这里边给不给事件,呃,事件时间与一都无所谓,因为大家发现我这里边既然是并行度都是一,然后又是按照这个系统时间直接一个一个产生,对吧,那后面其实处理的时候肯定是按照顺序来处理的啊,所以这里边只是常规的做法,一般还是。
01:00
这样来做的啊啊好,然后接下来那当然就是定义一个这个data stream了啊,大家看我这里面data stream就没有再去读取什么东西了,因为为什么呢?是不是直接现在就是ADD the source就完事了,对吧?哎,直接ADD就完事了,这里边new一个我们定义好的这个simulated source,然后把它创建出来,呃,那后边也不需要做这个map转换了,大家想想这里边生成的是不是已经是包装好的一个样例类类型了啊,所以这里边就非常简单生成,但是有一步大家不要忘记,既然是事件时间语义,那是不是应该要有一个asign time stand和watermark的过程啊啊,那这里面当然涉及到你到底是升序还是乱序呢?呃,我们前面既然按照系统时间不停的去生成,那直接给一个升序就完事了,所以提取的时候用这个time Sam,这里大家注意,之前我们后边都乘了1000乘一的。
02:00
原因是因为对那个本身时间是个秒对吧?哎,所以这里边我们要的是毫秒,所以乘以千,那现在我们这是秒还是毫秒呢?诶大家看到这里拿到的本来就是毫秒,那你就不要再乘了,对吧?哎,所以这里边直接拿过来用就完事了,好,这是我们前面的这一步操作啊,这个非常简单,然后接下来就是啊,开窗统计聚合对吧?啊这里边我们用的统计的方式呢,还是做了一个开窗统计,就是统计这个推广渠道统计了多少,我也是统计,比方说过去一天之内的对吧?呃,过去这个一周之内的统计这个,那我们这里面数据没那么多,那我们就直接统计,比方说呃,这个一小时或者十分钟之内的也是一样,大家想到这这就是一个窗口而已嘛,然后我们可以比方说每隔几秒钟就输出一次,给一个很高频率的输出显示,那大家想这就是又是一个又是一个滑动窗口,对吧?啊,所以接下来我们呃,开窗统计。
03:00
统计输出,我定义一个result STEM,基于之前的data stream啊接下来首先我们可能得先做一个过滤,因为大家想到了我做这个市场推广的统计,那用户如果是做了一个卸载,你这个就不要去统计进来了,对吧?啊,肯定是有一些这个选取条件的啊,所以这里边我先给一个这个filter,把这个behavior,假如说呃,不等于对吧,不等于之前我们。On in store啊把这个滤掉,然后接下来,诶,那既然是要做这个推广渠道的统计嘛,那是不是要根据推广渠道要做一个分组K,对吧?然后这里边大家可能还会想到,有可能我关心什么呢?不光关心每一个渠道里边这个数据有多少,我可能还要关心,诶这渠道里边到底它是下载还是安装还是还是就是看了一下对吧?那可能还要根据它不同的行为再去做分类,那这个怎么办呢?啊,大家可能想到我给两个K不就完了吗?对吧?啊,就是我们在定义当前这个KBY的时候,你既按照当前的渠道channel,也按照当前的这个behavior去做一个分组,那这两个K相当于就把我们的所有数据都按不同的这两个K分开了啊,所以这里边的定义呢,就有两种方式,一种方式是就前面大家还记得我们这个KBY里边啊,你这个给这个int类型,或者说给这个string类型。
04:33
的时候是不是可以给多个参数值啊,啊,所以我可以按照这里边比方说啊,我直接给这个channel,这是一个,另外再给一个behavior,可以给这样一个对吧?啊,直接写两个参数进去,那么它调用的就是我们这里边的方法得到的这个类型,就是一个Java的元组类型,对吧?底层会给我们把这个包起来啊,但是也有同学可能觉得,哎呀,这个当时我们做过这个包装Java元组后面要提取这个K的话好麻烦呀,我不想用这种方式,我还想用之前我们直接就是什么类型得到什么类型,那有没有别的操作呢?哎,对,下划线对吧,有有同学可能想到下划线,但现在你想这个下划线我到底是下划线什么呢?下划线下下划线这个behavior还是说下划线这个channel呢。
05:26
哎,这里边大家会想到之前我们用下划线,其实里边是传了一个函数对吧?下划线是那个拉姆达表达式的一个简写嘛,那我们传这个函数,它最终要得到的就是返回当前的这个K就可以了,那既然我可以这里边传多个值,然后最后它给我们包成一个Java元组,那我是不是可以直接返回这个类型,就返回一个盖拉的元组,大家想想想是不是可以这样做啊,然后我根据这个元组里边下划线一,下划线二去提对应的那个K就完事了嘛,哎,所以这这其实也是另外一种思路啊,啊,大家可以看看这种实线怎么样去做,那当然后面这个,因为当前的数据要有两个字段需要去下划线提取,那就不能用下划线替代了,对吧?大家还记得那个下划线拉姆带表达式里边怎么样能省对吧?呃,如果说是我们的那个参数在后边按照顺序只出现一次,对吧,而且是按照顺序我们那个参数。
06:27
先后顺序出现的话,可以用下划线代替,但你如果说我们同样一个这个当前这个数据,它要出现两次的话,这就不能替代了,那怎么办?那你老老实实把这个拉姆达表达式写出来嘛,我们当前的这个data,最后提取这个K的时候,返回一个元组类型对吧?那这个原组类型就是一个我要的是当前的channel,另外一个要的是当前的behavior,这样不就完了吗?对吧?这样我得到这个KBY之后的k stream,它的K的类型是什么呢?二元组类型对吧?两个string构成的二元组类型,一个代表channel,一个代表ha啊,所以这个就是大家用的多了之后,就可以有各各种更更加灵活的方法啊,然后接下来我们就可以开窗了啊,Time window,对吧,里边还是要传一个window啊,这里边我们引入window in time.time然后比方说我就统统计小一点啊,我统计这个一小时之内的数据吧,对吧,或者说大家觉。
07:27
那我可能要统计每一天的数据,那大家看这里边也有days嘛,你统计一天的数据也是OK的,然后接下来还应该有一个,如果说我们要输出的频率比较高的话,那是一个滑动窗口,那比方说我五秒钟就要输出一次,那是不是这里边应该给一个C5对吧?哎,所以大家试验的次数比较多的话,你就会发现,如果我不给这个五的话,那效果是什么呢?它真的就是一天才输出一次对不对啊,因为是一天的滚动窗口吧,就只有到这一天结束的时候输出一次,那如果说它还有一个滑动距离的话,那是不是最终输出的这个频率是以这个滑动步长为准的,对吧?相当于我们这里边就是五秒就输出一次,输出的统计的数据是多长时间内的数据呢?是过去一天内的数据,对吧,这是这个滚动窗口和滑动窗口应用这个场景就是比较典型的一个不同的要点啊,然后后面大家就想到了,那你。
08:27
这理论上来讲,这不就又是一个count吗?啊,那我们就知道了,那我还是定义一个这个aggregate,然后后边给一个语句和函数,然后再去做一个包,你假如说我们这里边想要拿到它的那个window信息的话啊,就如果想要有这个输出,输出信息,我依赖于这个window信息的话,那我就再去包装一个那个全窗口函数,对吧,再把那个window信息拿到就完事了,所以这里涉及到我们到底想要什么样的输出,我们再上面还是把这个定义出来啊,定义输出数据样例类his class啊,那我们想要什么呢?我还是把这个定义出来啊,Market啊,我这个还是叫view吧,View count吧,之前我们都叫view count吧,呃,那首先我关心的就是在哪个窗口内,对吧,我这个写的这个稍微多一点啊,比方说我这个要把window start和window and都列出来,而且。
09:27
那之前我们那个window end直接用的是长整形那个时间戳,我现在想看到它到底那个时间是什么时间字符串啊,那大家知道这个也简单,做一个转换不就完了吗?对吧?呃,然后window and and Windows start这两个我们都要写出来,所以前两个字段一看就知道是哪个窗口,后边呢,我还需要知道当前的这个channel是什么对吧?是哪个推广渠道啊,这个不要不要直接这么写了啊,String,然后另外还有这个behavior是什么?也是一个string,另外还有一个count数量,对吧?哎,就是我最终想要拿到的啊数据大家先把它定义好啊,就是这个这个需求其实我们都是能能想到啊,最后你做这个统计输出嘛,肯定就是想要的这些具体的信息,做一个报表展示就完了啊,所以其实就是这么几个字段,你如果要是最后我们拿到这个数据写入到,呃,数据库里边,对吧,你想写入到。
10:27
这个MYSQL里边写入到hps里边,这是不是也非常简单啊,对吧?用我们之前讲过的跟外部系统的那个连接方式写就完了,所以这里我只把它包装,包装成样例类,然后给大家print打印输出看到就可以,好那接下来呃,这个一般的这个agate跟之前都一模一样,这就是一个复习了,所以我这里边不再给大家讲之前的这一部分内容,我这里边给大家再用另外的一种方式来实现,我用一个什么呢?用一个process,用一个全窗口函数来实现啊,大家可能会觉得,诶,那你这里边如果用全窗口函数的话,那不是代表里边就不能是来一个处理一个,然后最后再再结合窗口进行输出了吗?你这不是相当于数据,所有的数据都要攒齐了,然后才一下输出吗?呃,我们看看这个处理的流程到底是什么样的啊,给大家举个例子啊,这里边我要去定义一个啊,那比方说这个叫my market count,对吧?呃,这个by channel。
11:27
定义一个这样的处理操作,最后我们想要的就是把这个打印输出print,然后大家不要忘记最后还有一个execute执行起来对吧?啊,这个叫APP market by channel job下边就是我们自定义的,自定义这个应该叫什么叫,就是窗口的那个process函数对吧?所以这这是我们定义的一个process window function class market count by channel,然后我们要实现process window function,大家还记得这个里边呃,需要定义哪些东西吗?哎,你看到这这里边有不同的这个,呃,地方的这个调用啊,我们这里边用的是什么呢?大家要注意用的是这个,就是本身这个scale里边给我们定义好的这个,呃,Process window function这个抽象类对吧,里边的类型是输入输出,还有当前。
12:27
Key的类型,另外还有window的类型,就这么几个,我们把它写好,输入是什么呢?大家发现现在是不是输入输出就是标准样衣类类型啊,对吧?哎,当前这个都已经定义好了吗?样一类类型,哎,有同学,诶,不对啊,你之前不是有这个二元组吗?我们这里是分组定义它的K,没有做map对吧?那这块大家要搞清楚啊,我们现在的输入的数据类型是什么,是不是还是之前的样例类类型啊,对吧?所以现在我们就是输入输出都是样例类类型,User behavior输出是,呃,那个叫market will count,对吧?然后另外还有当前key的类型,Key类型我们是二元组,String string把它写到这儿,最后还有一个当前window的类型,Time window。
13:16
好,把这个写好之后,如果不出意外哈,上面应该是没有问题的一个不报错的一个状态,然后接下来我们就看看在里边这个process window function里面必须要实现的重写的一个方法是process,这个process那就是哎,大家会想到这就不是每来一条数据都调用一次了,它是什么呢?就是我们说的所有数据都收集齐了,而且不是说收集齐了之后啊,就是它的触发的时间是什么呢?是当前窗口要去触发计算,不一定是关闭,对吧?我们之前说过就是它有fire和破嘛,就是他要去触发当前窗口计算的时候,要输出结果的时候,调一下这里边的process方法,然后拿到的参数有哪些呢?首先能拿到当前的K啊,刚才那个channel和behavior,这个能拿到分组啊,然后呢,有上下文,另外还有大家看到elements elements是不是就是当前所有。
14:16
口的数据啊,看我们输入的这个样例类类型,然后类型对吧?啊,最后还有一个参数叫做out,这就是我们说的那个collector,用于输出数据对吧?你想把什么数据写,写到那个输出缓冲区,就用它来发发射就可以了,所以接下来我们的操作就是把我们想要包装好的这个样衣类类型啊,输出的这个market view count里边的所有字段一个一个都提取出来,写到包装,包装好再用这个out发发射出去就可以了。所以我们看一下这个一个一个怎么去写啊,首先要一个window的那个start start用什么去取呢?现在没有window,但是有上下文啊,之前我们用过一个那个process function,对吧?大家还记得这里边这个上下文,它是有有这个window对象,然后接下来我就可以直接window.get start把它拿到对吧?哎,我们要的是那个大家知道这get。
15:16
Sa拿到的是一个长整形的时间戳,要转换成想要可视化的一个字符串的话啊,那我们还是把它转换成一个time sammp,对吧,转换成一个这个就是Java CQ里边的这个time Sam,然后再把它做一个to string。诶,这样的话,我们就可以比较直观的看到到底是某年某月某日啊,几点几分几秒这样的一个时间了,那同样这个end也是一样啊,直接new一个time stamp。然后里边从这个上下文里边的window拿到当前的end,然后转成string输出就完事了啊这个其实大家都知道并不重要,对吧?然后后面的字段是什么呢?我们想要包装的这个market view count啊,后面字段是channel和be behaviorving这两个从哪里去拿呢?诶,那这个也简单,当然是从这个K里面去拿了,对吧?大家看这就是我们直接用这个呃,写函数的这种方式啊,包装成scale拉元组的好处,这个简单多了,比我们直接从那个抓外元组里边提取简单对吧?诶,那这里边我们直接定义这个channel,它就是当前K的下划线一,对吧?啊,这个大家要注意对应关系啊,你当时定义的channel在前面,这里边下划线一就是channel,你如果是反过来的话,就就反过来,同样还有一个behavior定义就是K_二对吧,这些其实都很简单,都不重要,关键我们是要拿到那个count值对吧?哎,那之前我们是。
16:51
来一个就保持了一个计数器,来一个就加一,来一个就加一,那其实我们会发现啊,你何必那么麻烦呢,我们只需要的就是一个数量对吧?那现在我把所有的数据都保存下来了,窗口里边都保存下来了,在elements里边对不对不对,那我可以直接怎么样呢?这本来就是一个可迭代类型,可迭代类型是不是都有一个属性值叫做size啊,大家想是不是我直接一获取它的size就直接搞定了对吧?所以这里面其实我并不需要一个一个去迭代啊,所以大家看这个需求本身比较简单,就是一个count,所以说相当于我们用这种方法可以就是省很多操作对吧?我把这个数据都拿到了之后,直接用elements.size把它拿到就完事了啊,当然这里面也有代价,就是说你说到底是增量聚合的那种方式好,还是这种方式好呢?就这种方式好像比那个更简单一点,对吧,不用每个每一次数据来了之后都算一遍啊,那尽管那个很简单,就是加一,但是。
17:51
好像也是一步计算,现在好像我们最后就直接获取size就搞定了,但是呢,它的代价是你要把所有数据都缓存下来是吧,对吧,前面我们存的数据多了呀,你如果要是用那个增量聚合的话,其实只要存一个一个数就够了,对吧?就那个计数器,就我们那个accumulator就就不停的加一就完事了啊,所以这个就是看具体的应用情况啊,一般还是比较推荐大家用这个预聚合做增量聚合的,因为你像这个这是因为这个太简单嘛,就是一个数量,那你像我们要你如果要做这个,呃,求平均数呢,求平均,求平均数的话,你能直接把它这个size起来吗?不行啊,对吧,那你可能比方说哎,那有同学说有一些数据类型,我也可以直接sum,对吧,他有这个提供了sum方法也很简单呀,但是你要想一想,这个sum并不是它的一个属性,对吧,它调sum方法的时候,是不是相当于还是要便利我们这里所有的每一个元素挨个去做叠加啊,那你底层的这个处理。
18:51
计是不是就相当于还是数据散着都没用,到最后才一次一次去做处理,批处理,然后挨个去叠加,挨个算了,那你就不如来一个就算一次,对不对啊,所以这个大家要注意一下这个效率和我们这个内存空间的这个考量啊,这里边我们就是用了这种方式给大家做了一个实现,好,那后边怎么做处理呢?哎,那就是最后包装成样例类out.collect把它输出就完事了,对吧?这里边我们要不是user behavior啊,要的是一个market will count,然后里边每一个字段挨个填start and channel behavior,对吧?因为我们都写好了嘛,挨个写进去完事,这就是我们最后的一个处理结果啊,很简单是吧?啊,好,那我们整个代码都已经写完了,接下来我们运行一下,看看效果怎么样?
19:45
好,大家看到现在已经提起来,我们这里边已经有输出了,我们现在的要求是什么呢?每隔五秒统计一次对吧?哎,就是关闭一个窗口输出一次结果,而且大家看到这每隔五秒关闭的这个窗口里边啊,它这个数据很多,大家看这是25秒的这个窗口啊,这里边数据为什么很多呢?因为我们是针对不同的这个渠道,另外还有不同的操作也把它分开了,对吧,你要这么一分的话,那是一个成的关系嘛,当然每次这个分的组就很多,所以每一次关闭窗口的时候,这里边这个就有很多数据输出,而且大家看到这个数量是在不停增加的,因为我统计的是一天的数据嘛,大家当然是累计的,对吧?之前所有的数据都都算数啊,你越往后看,你运行时间越长的话,这个数量应该是越来越多啊,大家看到这个有不同的这个渠道,然后不同的行为都可以列出来,这是一个最简单的分渠道统计这个市场推广的一个指标。
我来说两句