00:00
现在已经有了整个的程序处理的架构了,那接下来当然就是做一个具体的填充和实现的过程,首先是自定义的预聚合函数,对吧,就是我们说的这个增量聚合函数啊。这个我就快速给大家写一遍,做一个复习,Public static class,现在要的这个叫做page view,呃,Count a j。我们这里边实现的是一个aggregate function,呃,然后里边的类型大家记得是input ACC,还有out对吧?哎,那input当前的输入当然就是port类型嘛,阿帕奇log event后边的ACC是不是还是长整型啊,我就做一个当前的那个count的计数嘛,来一个加一,那最后是不是还是长整型直接输出就完了,因为后边我们还有是不是还有一个window function式啊啊,所以就是它的得到的结果,我们最终要的就是一个长整形的count值,所以重写一下这四个方法,大家发现这里边除了这里的类型跟之前不一样,其他是不是完全一样啊,对吧?这里面实现几乎完全一样啊,所以就是0L,然后ADD的时候是不是也是accumulator,直接来了之后加一就完事了,后边get result直接返回accumulator墨里边直接A加B。
01:29
所以这个就非常的简单啊,完全一样。好,接下来我们看一下。接下来就是自定义。实现自定义的。Public static class,接下来这个我们叫做呃,Page count result,对吧,那这里边我,诶这个应该是对的,对吧,前面这个叫配置count result,现在我们要实现的是一个window function。
02:04
那么这个window function大家还记得它有四个泛型需要去写入当前的输入in输出out,还有当前对键key的类型,还有这个window的类型,那所以现在输入是前面增量聚合的输出,所以应该是长整型浪,那它的输出是我们包装好的那个聚合结果to类型,呃,我们当时管它叫做page view count,对吧?啊,就是view count啊,然后诶这里大家要注意下一个,这个就比较重要了,它当前它的这个键的类型应该是什么呢?之前我们这边直接写temple对吧,大家要注意temple的话,主要原因是在于我们前面做KBY的时候,你如果要是直接传一个string类型的字段进来的话,那这里边我们调的这个方法。看一眼啊K,这里边直接掉的这个方法是不是就相当于是。
03:05
得到这个类型k stream就必须得是t temple类型啊,所以后边我的那个K类型是不是就是temple啊,但现在我们传的是什么。传的是不是相当于是一个select呀,大家看这里面调的是这个select对吧,得到结果是不是就是本来你提取出的K是什么类型,这里是不是就是什么类型啊,哎,你看这个key key select嘛,它的类型是不是就是直接输入一个value进来,然后get key直接返回一个K啊,诶,所以这里边就是你本来拿到是什么就是什么,我们现在URL当然就是一个string类型。最后还有一个window,现在就是一个time window,直接把这个一起检查一下,上面是不是类型就完全匹配了啊,所以大家注意,如果你直接改之前的那个hot items那个代码的话,你直接把它改成这样,肯定后面会报错,就是因为现在是不是类型就不匹配了,K的类型不一样对吧?啊这个大家要注意一下,下边我们这里边必须要实现的一个方法叫做apply。
04:08
Apply里边是不是这里边其实能该拿的东西就都已经在这了,所以这个我更简单一点啊,直接return new一个,现在想要的是一个叫做page count的这样的一个。Po类的对象对吧?呃,然后前面一个是要这个当前的URL。类似于我们那个ID嘛,URL从哪里拿。是不是直接就是这个string啊,对吧?哦,我直接把这个改一下吧,叫程URL,我们这里边看的更清楚一点,直接就是URL,然后后边window and,那就是window.get end,对,然后后边是不是就是input,直接把这个拿出来,然后input,呃,这里边是一一套只有一个数嘛,然后直接next,是不是这样就完事了,诶,但是大家注意这个其实不对啊,我们这种写法不对,为什么呢?
05:02
因为当前这个apply是不是根本没有返回类型啊,所以我们并不是要return它,而是要对out点。直接把这个写出来是不是就没问题了?呃,这里我们看一下还差一个括号,对吧,这样括把它括起来,你有一个page view count,把它输出就可以了,这就是我们前面做这个窗口聚合的过程,那做完窗口聚合之后呢,我们还要基于window and再做一个分组,做一个KB,然后后边还要做一个这个直接把所有的数据都到齐了之后,做一个排序输出,还有一个这样的一个kid process function,所以接下来关键是我们要把这个再做一个实现。实现自定义的处理函数,所以当前我们是public static class,当前我们这个叫做top n hot pages哦,那我们知道既然是kid process function嘛,它并不是接口,而是一个抽象类,所以我们要extend k的process function,它的类型KIO,哎,那问一下大家现在的key是什么类型?
06:24
按照我们前面的写法,如果用了这个方法引用的话,或者写拉姆达表达式,这是一个key select对吧?诶,那我们这对是不是window and得到的是什么类型,这就是什么类型啊,所以现在我是一个长整型的K,然后输入,那是前面的输出,是配置view count对不对?聚合结果那当前的输出,而我们还是包装成string,直接直接打印了对吧?诶,那就是这样的一个过程。把这个写完之后再检验一下没问题了,对吧?诶这里边是还差一个这个字段啊,所以当前我们定义一个private,呃,当前我定义一个这个in。
07:07
整形的这个叫top size,然后下边我需要把这个constructor创建出来,那上面是不是就完全没有问题了,哎,这个这个所有的类型都匹配上了啊,所以接下来我就是在这里边去做一个定义了,那首先我是不是要定义这个状态啊,对吧?哎,所以我们是要定义状态,我们是要保存当前所有,就是page view count对应的那个保保存这样的一个,呃,Pay count是不是要到当前的那个一个list里边啊,对吧?到list中,所以我们定义的是一个list state。List state里边的类型当然就是count了。然后诶里边那接下来是定义一个这个名称啊,我就叫做page count,呃,List state。
08:04
他具体的那个声明是不是必须要放在。诶,当前啊,我是需要这个直接在这个open生命周期里边get当前的运行上下文,然后去定义对吧?Page will list state等于get runtime contact,然后get list state,去new一个list state script这样一个描述器里边给一个当前名称名称,我这个就叫做page count list,对吧。后边来一个类型,那当前就应该是,就应该是我们当前不是那个叫page,呃。当前那个就就是当前应该是那个page view count对吧,所以就是page view count.class把这个定义出来。这就是我们前面的基本的一些定义,那后边诶,这个必须要实现的一个方法叫process element,就是每来一个数据之后到底干什么事,对吧?哎,那这里边还是跟前面我们的处理流程一样,每来一个数据之后,是不是直接就把它添加到当前的例子里边就完事了,另外注册一个定时器,对吧?Cx timer service register even time timer,还是同样用当前的window and,再加上一加一毫秒是不是就搞定了?
09:31
最后最关键的是不是要放在on timer里边,定时器触发的时候去做操作啊,诶那这里边,呃,首先这个定时器触发的时候,我先把那个当前的数据都拿出来,对吧?大家记得我是用了一个list list点你有一个a list对不对?呃,把当前page view count list state它的这个get拿到对吧?呃,对应其实大家知道这个你可以传这个ER啊,或者是我直接它本身是不是也可以啊,对吧?呃,这个也是直接可以拿到,就是一个interable类型直接传进来也可以,也可以直接拿到我们当前这个list啊,啊,我就直接把这个传进来就完事了,下边是不是就是做那个排序了,大家还记得这个是page置view count,直接点sal里边,要去new一个competor里边,这里边怎么写?
10:27
哎,是不是我要用后减前啊,对吧,O2的那个count要减去O1的对应的那个count值,那之前我们是直接那个get count之后用了一个int value,当时我们说那个方法其实不太好,对吧?呃,更好的方式应该是怎么做。因为它有这个,呃,正负零,所以我们这里面最好还是if else做这样的一个判断,对吧?呃,就是如果if oe.get count,如果要是这个大于o2.getcount的话,这个返回什么。
11:06
啊,这个我就不用写画括号了,对吧,这个是不是应该返回一个负一啊,对吧,那就是如果说前面就是呃,前面比后边大的话,我相当相当于是呃就是直接直接就相当于是不用不用再去做那个排序,再替换这个位置了,对吧?哎,直接是负一,然后else if o一点get countt,如果要是小于o2.getcount的话。那么接下来是不是return一个正一啊,对吧,然后最后else如果是零的话,是不是就RETURN0啊,这就是一个returning。这就是一个完整的流程啊,啊大家如果严谨一点的话,就这样去写,要不然的话,我们那个长整型转int类型的时候,可能会出现一个经,就是直接截取的话,可能那个大小就变了,对吧?好,所以呃,这是前面这个做排序,其实我们得到这个之后也就已经得到结果了,想要写入到外部系统就直接可以写了,那最后我们还想做一个排名格式化,对吧。
12:15
格式化成字符串输出,呃,那这个过程其实直接可以借鉴前面的那个hot items里边我这就直接抄过来吧,整个流程是差不多的啊。呃,所以我还是去创建一个stream builder,然后接下来是不是想要什么往后面直接追加就完事了啊,那这里边我当然就不用item view counts了,现在我这个叫做page view counts对吧,直接取这个size,这里边当然就是page view count,每一个都从里边去做一个提取,后面这也不是get呃,Item ID了,这里我应该是get URL对吧?这也不是商品ID,这就是当前的页面URL啊。那另外就是既然是热门嘛,下面我还用这个热门度,或者说我这个直接就叫浏览量,对吧,浏览量。
13:16
因为我统计的是这个,呃,当前页面的这个浏览量嘛,所以这就是这样的一个统计,对吧,我们直接就实现了同样的一个功能啊,那接下来我们把这个直接做一个打印输出,看一下效果怎么样。好,我们看到这里边果然输出了对应的结果,对吧?大家看到我们现在不是五秒钟滑动一次吗?所以这里边的窗口都是五秒钟输出一次结果,然后这里边的这个浏览量大家看其实是在不停增长的,对吧?呃,就是前面小一点啊,后面再一直长,当然了,现在这个长得很慢,因为是不是五秒钟一次,它有可能中间没数据,所以这里导致这个浏览量都没长,对吧?呃,所以呃,我们统计的这个数据是十分钟的窗口,五秒钟增长一次,就是更新一次,所以一开始的数据量肯定是很小的,那如果说统计到后边的话,可能这个数据量会不停的涨上去,对吧?啊,这个大家知道这个最终的结果是什么,这就可以了,这就是我们想要得到的这个结果。
14:23
啊,但是大家会发现这里面有一点问题,就是我这里边输出的这个大部分,如果我直接这么提取的话,大部分输出是什么呀。这边说大部分输出的都是一些资源文件啊,比方说点ICU,大家知道这是那个,呃,Icon就是那个图标文件对吧?呃,还有就是这个CSS,或者说后面统计的话,有可能是JS文件对吧?啊就都是一些比方说像这些资源申请到的资源文件啊,那我们再去处理这个页面访问的时候,是不是应该尽量要避免把这些东西统计出来啊。那大家想怎么样去避免这种情况呢?
15:00
那很简单,是不是前面做一个过滤就完了,诶这个过滤我应该怎么样去,对于首先这个位置是不是就应该在前面我这儿啊,Filter这个get的时候,这里边是不是就可以进一步去做过滤,把对应的这些呃,就是URL不符合标准的直接滤掉啊啊那大想这里边绿应该用什么方法去滤呢?诶,这里边其实是。大家想这个如果我单独直接去写它后边这个就是最后几个等于什么,不等于什么,这个好像有点麻烦对吧,其实最好的方法是写一个,写一个正则表达式啊,所以给大家可以。看一下我们这个文档里边的这种实现啊,这里边直接就是写了一个大家看这个政策。就直接加了一个filter,然后呢,写了这样的一个正则表达式,这个应该可以看得懂吧,前面这个监括号是表示什么时候开始对吧,以什么开始,然后后面这个Dollar表示是结束,然后大家看中间呢,这个稍微特殊一点,有一个问号,感叹号,这是表示什么呢?
16:07
啊,这个是一个所谓的这个非捕货源字符,对吧,它表示的其实是后边要做的是一个反向匹配,就是说我当前诶,你看后边这是不是点什么什么什么,然后Dollar这是不是以这个结束啊,所以我要选取的是。不以这些去结束的,这样的对应的那些字段对吧?啊,所以当前我就是比方说不以CSS JS p ngco对吧,这些资源文件结束的,我要把它提取出来啊,所以我可以把这个直接就copy在这儿啊,啊大家看后面这个还是增长起来了,但是这个这个浏览量好像还是很小对吧?呃,我直接把这个。加在当前的这一步filter后边。这里边我需要去引入当前,就是Java里边我们进行那个正则匹配的那个pattern对吧?Pattern return一个pattern.matches然后把当前的正则传进来,把后边我们的这个URL传进来,对吧?这样的话就可以做到这个效果了,那接下来我们再运行一下,就会看到前面的那些东西,就就是资源文件就被过滤掉了,对吧。
17:19
大家可以看一下这个执行的结果啊,来看现在我们得到这个结果,呃,就相对来讲就少很多了,对吧?呃,就只有类似于一些HTML之类的一些文件啊,或者说是直接这个,你比方说blog什么tag对吧,这样的一些访问的URL才可以直接提取出来了啊,这就是我们最后得到的一个效果,当然这个浏览量可能就更小了啊。
我来说两句