00:00
接下来呢,我们再讲下一个需求,呃,我们会想到其实一般情况下,我们统计这个网络流量的时候啊,呃,可能对于这种top n热门页面的统计,其实相对反而是比较少,我们一般统计的就是什么呢?整个网站整个平台的呃这样的一个浏览量的统计,对吧?呃,因为像这个呃实热门页面统计的话,这个相当于是按照不同的页面去做了一个点击量统计量的一个划分,而我们如果要统计整个平台的这个浏览量的话,那大家想是不是就相当于是一个汇总啊,就不分页面对吧,所有的那个点击行为我们全汇总起来,看看这个总数是多少。这其实就是大家非常熟悉的最经典的这个操作就是统计p vov啊,那当然最简单的就是统计这个PV了,PV大家知道其实就是用户的那个点击行为,来了一次,访问了一次这个当前网站,我就记一,记一就count就加一,然后就保持这个count不停的累加就可以了啊所以那这里边就又涉及到我们按照什么数数据去表示当前网站的浏览量,访问量呢?还是两种方式,一种就是直接可以从这个前面我们讲的那个web浏览器啊呃,有web服务器本身的日志里边去提取用户的访问h hev发起的那个HTP请求,那另外还有一种方式就是如果我们做了买点日志的话,是不是直接从买点日志里边把它拿出来就完事了,对吧,用户点一次就相当于是对于这个网站浏览量的一次贡献。
01:33
所以接下来呢,我们就还是回到之前的这个买点日志里边提取他的,大家还记得之前我们那个有那个PV行为吧。那还是提取PV行为,接下来是不是直接统计整个所有的点击量就可以了,哎,这就是最简单的一个PV实现的一个过程,当然如果要是想要做这样的一个实现的话,我还需要把之前的那个数据其实还是得copy过来,对吧?因为我们这是不同的模块了嘛,在打包资源的时候最好还是要区分开。
02:05
当然大家也可以直接引入之前我们的那个resources,这里边的这个user behavior对吧?呃,就是hot items下边的啊,我这里边还是把它直接copy过来了。把这个引入,然后另外大家想到对应的,就当前我们要定义的那个定下边是不是也应该要多一个user behavior啊对吧,我当前就是还是把把对应的那个copy过来。当然实际生产项目当中,有同学可能会想到,假如很多个这个不同的模块,我们有公有的这个呃病的话,那其实是可以去包装在外面提取出来的,对吧?好,我直接把这个copy过来。好,这是这个U德黑米,那接下来我们要实现的就是。当前的这个PVPV大家其实知道,这个全称就是叫做page view,就是当前我们这个页面浏览量的一个统计,对吧?啊,所以我把这个就叫做page view,那前面的处理流程PSVM还是创建出来。
03:07
没方法,Throw一个exception,然后接下来创建执行环境,然后读取这个文件数据,大家想是不是跟前面基本上差不多啊啊,所以我这个就直接copy hot items下边。本身的这个做法啊。把前面这个数据创建这些环境啊,我当前就不用那个卡夫卡数据源了,直接还是从这个文件里边把它读取出来就可以了,呃,这里边我们还是为了。方便后边做这个进一步的。呃,就是整个这个看起来更加的呃,符合这个生产实际一点,不要把这个。URL直接写死啊,就是当前我们这个文件路径,不要直接写死在这里。而是用前面的。
04:01
前面我们这种resource的方法去直接做一个提取对吧,配置呃,will.class然后get resource这里边可以给一个。相对路径user behavior csv,好,把这个定义出来,这个叫做resource,那下边我们就是resource.get pass,先把它定义好。然后下边的第三步的话,我们还可以继续抄,转换成这个user behavior port类型对吧,然后分配呃,时间戳和这个water mark,这都是我们直接直接就可以想到的啊。好,我这里边直接把这个copy一下,后边去做这个统计的话,这个我们就不去copy了,下边大家能想到最终应该就得到一个,呃,比方说啊,我得到一个PV result stream,直接去做一个打印输出就可以了,最后不要忘记EV execute,把它执行起来。
05:03
里边我定义一个,这个是PV。Count job,对吧,就要做这个PV的值的一个统计,所以接下来我们其实想要想要做的事情,那也不是这个包装成这个item view count,对吧,大家会想到我接下来非常简单,我是不是直接给一个就是。就是有有一个非常简单的操作,大家想怎么去做呢?因为之前我们这里边首先这个过滤PV行为啊,该过滤还是要呃做一个过滤的,然后后边之前我是k buy item ID,那现在我要k item ID吗。要按照商品去做分组吗?完全不分对吧,哎,那我可以怎么去做呢。那有同学说,那我就直接去直接去聚合嘛,开窗聚合嘛,但大家想之前我们是不是说过,如果要是做聚合的话,哎,就是按照我们这个API定义是必须要K对吧,必须要先KBY之后才能聚合,或者是你开窗的话,一般推荐的也是要先去做一个这个KBY之后先分组,然后再再window。
06:07
啊,那所以这里面大家就想到我要去做KPI,怎么KY呢。那我们可以结合一下之前做这个worldcom的时候的那个思路,一开始我们做worldcom测试的时候,大家想我是不是来了数据之后,可以把它map转换成一个二元组啊,对吧?哎,来一个map成一个二元组,那我现在呢,其实也可以把它map成一个二元组啊,只不过word com的时候,后边我是KBY一个word,当前的那个word,那我现在呢,我map成二元组的时候,是不是直接以当前的这个,就是所有的数据来了之后,我都叫同一个word是不是就可以了?就相当于我后面KPI的时候,KPI的是同一个值,就相当于把所有数据都发到一个组里了,对吧?哎,所以这是一个最简单的实现思路啊啊,那么我们接下来就是首先过滤这个PV行为,那接下来是不是想要把它map成一个二元组啊啊,那当然map成二元组之前我们说过原组的这个类型的话,直接写拉姆达表达式,你后面还得专门指定那个返回类型,对吧?这个比较麻烦一点,那我们就干脆直接用一个map function吧,然后这里边我们想要得到的这个数据类型应该是一个,呃,一个二元组类型,对吧。
07:24
Temple呃二然后里边对应的这个类型应该是呃,String,就是当前我应该是要一个这个随便给一个字段对吧,比方说我就叫PV,然后后边是不是应该要有一个对应的那个count值啊,比方说给一个int值或者给一个long啊,这个都是可以的,把user behavior引入,然后里边是不是必须要实现一个map方法,这个map方法里边。非常简单,直接你一个二元组里边我的K是不是统一都叫做PV,后边是不是直接给个一就完事了啊,所以这个其实就特别简单啊呃,把这个直接写出来,然后后边那就不要k buy这个item ID,那我这里边其实应该是什么呀,因为是原组嘛,我当前是不是就直接把第一个字段提取出来就完事了。
08:17
大家想想是不是就是第一个字段啊,所以是不是直接K0啊,而且这个以数字表达位置的这种写法是不是只有在原组里边才才有效啊,所以我们直接就是这么去做,然后接下来当然就是开窗了,我们做开窗的时候,呃,就是现在我们统计PV一般就是同样的一个时间段,对吧,一般就不去做那个滑动窗口的统计了,比方说我们就统计一个小时之内的,对吧,开一小时滚动窗口,嗯,去做一个统计,这就非常简单啊,然后后边我们可以。啊,当然这个大家可也可以不做这个agg啊,我直接是不是直接sum就完事了,因为前面我是不是已经做了这个,呃,KBY,然后后面有一个字段就是一啊,我直接SUM1,这跟我们之前做的那个work count是不是非常类似,直接这么一做计算,哎,直接就可以输出了啊啊,当然我可以把这个叫做我把这个先删掉啊,大家可以自动的推断一下这个类型。
09:23
好,呃,我们当前得到这个可以叫做PV result STEM,这就是我们最终得到的结果,大家可以把这个做一个打印输出,看一看效果怎么样。这个过程还是很简单的。好,接下来我们看一下这个统计输出的结果啊,大家会发现这里面统计输出的结果啊,统计出来啊,每个窗口是不是都有一个二元组输出啊,然后它的K都是PV对吧?然后我们就是PV,然后后面跟上一个值,然后大家看这个统计出来,大概每一个窗口一个小时内大概这个调呃数据大概就是四万多,最高到五万多对吧?呃,最多的一个是522,呃,52552啊52500多啊,这样的一个PV值,那大家回忆一下也差不多,我们那个数据不是总共40多万条嘛,然后你看这个窗口有多少个。
10:18
呃,一数的话,刚好十个窗口,最后一个比较少,那是因为最后那个时间可能截止了,对吧?啊,就只有几条数据,那那最后统计出来的话,总共40多万条,每个窗口四万四五万的这个数据量差不多啊,这就是我们当前的这个数据啊,因为我们还有一个PV过滤,有一些数据是不是根本没统计进去啊啊,这就是最终的一个结果,那大家可能会发现这个有有一点缺陷啊,首先就是说我们这里边输出的结果是不是不带window信息啊。我看的不明确对吧,我们当时是包装了一个,对于当前这个,呃,就是想要输出的结果,其实是可以做一个包装的啊。
11:00
呃,就是本来我们这里边得到的是那个,呃,就是大家看到本来得到结果可以包装成类,类似于这个patri view count这样的一个东西,对不对?URL的话,这里边我们没有URL,我就直接写total或者写PV不就完了吗?然后后边加上一个window and,再给一个值,这个就会看的比较明显一点,一下就知道当前是哪个窗口输出的这个结果到底是什么样子,对吧?啊,这个就就会看起来更加的这个直观一些,然后另外还有一个问题大家想到就是我们接下来可以去做一个做一个并行测试啊,就假如说这个时候我们现在是这个并行度是一,所以说我们其实可以非常明确的知道当前这是第一个窗口,第二个窗口,对吧,这肯定是按照顺序输出的嘛,但如果说我这里并行度给一个四的话。大家想当前输出的这个结果是不是就乱了呀?那就是这里边是不是这个顺序就不一定是按照我们这个先后的顺序,一个窗口一个窗口输出了啊,啊,所以这个显然是能够看到的这个效果啊,我们自然就想到了,可以把它包装成一个对应的那个pat count那样的一个类型啊,首先是把这个sum可以去做一个更新啊,我们还是用那个IG方式去做一个调调整,然后另外大家发现这里边我输出的这个结果呢,大家发现啊,就这里边是不是还是都是都是同一个分区输出的呀,好像跟当前的没什么区别是不是。
12:34
那为什么会出现这种结结果呢?对,大家想到这个结果,就是因为我们前面的那个K是完全一样的,对吧,那所有的K都一样的话,我们说按照哈希扣去做重分区,那最后肯定是分到同一个分区,对不对,所以就相当于我们现在的病情就一点用都没有。那所以大家想我们接下来这个改进,除了可以把这个改成一个aggregate,加上这个window信息之外,是不是还可以针对这个并行代码并行去做一个改良啊啊,所以接下来我们可以尝试做一个这个操作。
我来说两句