00:00
未来我们要实现的是第二个需求,就是热门页面的一个统计啊,这一个主要是我们接下来这一部分主要是想要去统计当前的这个实时流量,实时流量呢,呃,大家想统计这个呃页面啊,或者说流量其实就有两种方式,一种是直接基买点日志,那就是前面我们这个user behavior csv这个数据,那或者呢,我们也可以就是直接基于服务器的web服务器的日志,对吧?啊,那这样的话就相当于我们可以不做买点,直接收集这个外部服务器对应的那个数据,就知道用户对哪些页面做了访问,做了发,发起过请求的话,这就是一次访问嘛,所以接下来我们做这个热门页面浏览量统计的时候呢,就可以直接去分析当前log里边对哪些页面,呃,对对应的有那个访问的请求记录,那么我把这个记录做一个count统计,接下来是不是做一个top的排序啊?
01:00
啊,排序最后输出就可以了,所以接下来我们在这个代码里边新建一个模块。你有一个module,当前的这个模块,我们就直接叫做network analysis。然后当前这个po文件里面,我们不需要任何的引入对吧?想要的这个弗林和卡夫卡都已经有了,接下来我们重点就是代码应该放在source main Java下边,然后接下来,哦,这里边这个Java大家看它还是灰色的啊,那我们应该要mark as source route对吧?然后接下来我们需要的那个文件到底放在哪里呢?现在我们需要的不是那个买点日志的user behavior了,我们需要的是当前web服务器的一个日志啊,这个我们是单独这个叫做阿帕奇点log对吧,阿帕奇的一个日志文件,直接把这个放在resources下边。
02:02
然后接下来我们其实是需要在这个Java下边去新建啊对应的这个对象了,那大家首先想到我们这里边这个日志是需要做一个ETL的,这里边呃,有一些字段可能我们并不需要,然后有一些字段呢,你像这个时间,我们是不是需要把它转换成一个长整型时间戳二哎,那所以最终我们把这个得到的这个结果啊,ETL之后的结果包装成一个想要的po类型,类似于抓兵的一个类型,我们可以把它直接给一个名称叫做阿帕奇。Log event就这样的一个阿帕奇日志事件,那这里面的属性大家想一下,我们主要是不是就是要呃这么几类几几个啊,首先IP。我先把它拿到,然后后边其实user ID和username的话,大家想username一般没什么用,对吧,我只要user ID是不是就够了啊,当然这里面的user ID的话,之前我们都是长整性,现在它既然有那个横线,我为了统一是不是还是直接把它叫做这个string就可以了,呃,对吧,直接转成string啊,这样的话就不存在数字和这个当前的横线需要去做转换的一个过程啊,这也是一个string,然后接下来是不是还应该有一个长整型的时间戳啊,Private长整型time stepmp,哎,那后边还有两个,这个也是必要的,就是一个是当前请求的,对请求的,呃,这个URL肯定需要,对吧,这是我们最关键的这个页面的信息嘛,然后另外是不是还需要有一个当前请求的方法呀?诶,所以当前这个到底是get还是post post这个是比较需要的啊,所以第下一个字段就按照顺序的话。
03:50
子讯类型的一个method请求方法啊,最后还有一个URL把这个写出来啊,那这个流程的话,接下来还是空参的构造方法先列出来,接下来还有带着参数的构造方法,我们也把它直接自动生成,最后还有。
04:12
因为是类似于抓病的要求,是不是还得有这个get center啊啊,最后方便大家这个打印看信息的话,我们把对应的to string也做一个实现。这就是对于这个一开始啊,阿帕奇lo event这样一个po类的定义啊,我们其实这里边应该是把这个应该放在一个对应的那个包下边啊,所以这里边我们还是新建一个,新建一个用一个这个package啊。当前我们把这个叫做com点,艾特硅谷点。Network flow analysis。BS对吧?啊,我们放在这个BS下边啊,然后其实是想要把这个阿帕奇log event直接放到它下边,然后我们做一个这个。
05:05
好呃,然后接下来大家想一想,我们就是依赖的这个对应的这个po类还有哪些呢?因为后边我们想要做一个实时页,呃,实时热门页面的统计,我们会想到这个实时热门页面跟前面的实时热门商品基本上是一样的,都是一个top n嘛,那这个整体实验思路是不是也是先要做一个开窗统计聚合,然后得到结果,收集齐了之后再来一个排序输出套。那前面我们那个,呃,实时热门商品统计的时候,是不是前面开窗统计聚合的结果有一个对应的po热类做一个保存啊,带着那个窗口信息对吧?那现在是不是我们也要有一个对应的那个窗口信息啊,啊所以接下来我们其实可以想到啊,对应的应该有一个,对,之前我们那个不是叫做item view count嘛,那现在同样的是不是应该来一个比方说这个叫page view count,就是当前我那个页面啊浏览量的一个统计的值,那大家想一下里边我需要哪些字段。
06:10
跟那个一样,是不是我现在需要页面ID,那现在页面ID应该是什么?对,就是当前的URL嘛,然后同样的是不是需要当前的窗口的信息长整型定义一个window and,然后另外还需要一个。长整形count值对不对啊,就是这三个字段,我只要把它拿到就可以了。首先给一个空参的构造方法,然后带参数的也都列出来,下边还是这个get set,还有对应的to string方法。我们都把它直接创建出来,这就是我们一开始要做的这些准备啊,相关的这些数据准备什么的就都有了,好然后接下来呢,我们就呃不不用新建package了啊,我们直接在这个下边去创建这个class就可以,接下来我们要的还是带上报名啊,com.at硅谷点。
07:17
Network flow analysis点,诶当前我们这个就直接叫做hot pages热门页面,好把这个类型创建出来,接下来我们知道一开始还是一样的啊,这个main方法里边throws exception。首先去创建当前的执行环境,Stream execution environment,把这个添加进来,EV,哎,把这个拿进来啊,然后env,那就先去set这个当前的时间语义,那大家想一下,现在我们的时间语义应该是。肯定就是那个事件时间语义了,对吧?啊,就是直接是因为我们当前那个数据里边是有对应的这个时间戳的嘛,所以这里边当然是要一个事件时间语义啊,所以time characteristic,引入这个even type,然后后边的话还是全局的把这个设成并行度是一,然后我们整个这个处理流程。
08:21
跟前面那个其实差不多的是吧?呃,接下来其实主要就是说,首先先读取文件转换成po类型啊,那首先这里边我们先定一个这个,呃,Input stream吧,我定义成直接定义成data stream stream,因为我们从这个文件里边读取数据的时候,肯定直接就是stream,对吧?呃,Read text file,直接把对应的这个文件读进来。当前是这个POM文件啊呃,不,我们在那个resources下边这个阿帕奇log,把这个copy一下。
09:04
好,这里大家会发现,如果说用这种方式去直接获取的话,这个绝对路径直接放在这儿感觉是有点儿有点不太舒服,对吧?诶,那或者大家也可以怎么样呢?就是我可以用当前这个反射啊,大家看我直接hot patris get,然后可以怎么样直接get resource啊,这样的话我就可以直接写一个当前。就是当前我打包之后啊,他给的下边的那个相对路径,相当于是对吧,直接取他当前这个resource下边的相对路径,然后我直接给这个阿帕奇点log,就把当前这个文件名放在这儿就完了,然后这个拿到的其实是一个我们看一眼其实是一个resource对吧,然后下边这里要传的时候呢,就不需要直接把这个全量的。这个绝对路径直接传进去了,我可以直接取一个resource,诶大家知道是不是我直接get pass就可以了啊,用这种方式的话,我们就即使是从文件读取啊,就也不会那么的呃,看起来那么的生硬,直接把那个全量呃,整个的那个绝对路径放进去啊呃,配置起来也就容易一点,对吧,我直接把对应的这个文件打包在当前资源里边,后边它直接取这个相对路径就完事了啊,这是一开始读取数据的这个过程,然后后面我们要把它转换成po类型了啊,那data stream。
10:31
应该是。阿帕奇,我们那个叫阿帕奇log对吧,然后接下来我们直接把它定义成一个data stream啊input stream接下来做的就是,哎,就是一个map,我们首先是相当于一个ETL的过程嘛,那每一行还是啊,来了之后做的这个操作是不是先切分开得到那个string类型的数组啊,啊,那这里边我们还是叫做fields啊,基于这个长整,呃,这个line去做一个切分,我们现在要切分的那个字段是。
11:07
空格对吧?呃,就是直接按照空格去切分,这里面还涉及到一个问题,我们中间别的那个字段也还好,直接转换就完了,那这里面有一个特殊的东西是时间对吧,这个时间是不是想转换成时间戳啊,所以这里边我们要定义一个对,是不是要定义一个那个simple date format呀,对吧,你有一个simple date。Format这里边就要按照当前这个patternthon定义了,现在这个应该是日月年十分秒对吧,前面是这个斜杠,后边是冒号啊,所以这里边我定义的这个格式应该是DDMM,然后YYYYYYY,后边是HH,然后MMSS,对吧,应该是这样的一个定义啊。
12:00
我把这个先定义出来,接下来是不是就基于它去做一个转换就可以了啊,我想得到一个时间戳time Sam,那就直接simple date format去做一个pass,这里边我们要pass的是哪个字段?应该是fields里边这个要看清楚了,当前的这个,当前按照空格分割,这应该是0123对吧,我们拿的是那个F3,所以把这个拿到,诶注意这里边pass得到的结果其实是一个。这个Java里边的date类型对吧?呃,就是那这个date类型,接下来我们想得到那个长整型时间戳怎么办呢?它里边是不是有一个get time方法呀,这个get time方法就得到的是一个当前的,诶这是不是就是当前从1970年1月1号开始那个毫秒数啊,对吧?当前时间戳就拿到了啊啊所以接下来有了这个时间戳之后就可以return,你有一个包装好阿帕奇log event了,对吧?那呃,里边的这个字段,那还是一个一个提取了,第一个字段我们要的是IP,那就是FIELD0对吧?呃,第二个字段我们要的是那个UIDUID的话我们现在它是没有,但是我们就直接把对应的这个第二个字段啊。
13:20
现在是我们拿不到。但是我们可以把它这个定义出来,就是这里边的第二个字段,就当做这个user ID就可以了,所以就直接用FS1,然后接下来接下来是不是就我们定义的就是这个time stamp了啊,那就刚才转换出来的time stamp,然后后边是method和URL,这个大家要看清楚1.0123下面的四,这是十区,十区我们没管对吧?啊,然后是五和六,呃,所以我们要的是都是string类型啊,FIELDS5和fields。六。
14:00
哎,这就是我们一开始做的这个转换啊,那另外接下来是不是还应该既然是事件时间语义,是不是还应该分配时间戳和watermark呀,对吧?点as sign time stamps and watermarks啊然后这里边我们去new一个,这里要注意了,是生序时间还是乱序时间呢?诶,那关键这个要看看数据了,对吧,我们看一眼啊这个数据。呃,03434712,大家看乱序数据对吧?而且这个乱序程度还挺大啊,这个因为我们也是测试数据嘛,所以这个看起来啊,这个乱序程度是很大的一个乱序,那所以这里大家问题就来了,我这里边该怎么设置呢?是不是接下来就应该是一个bounded out of orderness time Sam extra对吧?提取时间戳的话,这个非常简单,Element点直接get time sample诶那有同学说乘以1000要不要乘以1000,注意关键你这里面乘不乘1000是要看它到底是毫秒还是秒对吧?哎,那这里边我们前面是不是gettime已经把它转换成毫秒了,本来这个时间这里是秒对吧?哎,但是大家想我转换成毫秒之后,是不是相当于已经有000了,那我们就不要再乘1000了,对吧,啊,所以这个就不要了。
15:18
那上边还有一步操作,是不是这里要给一个延迟时间啊,那大家注意这里面延迟时间给多少呢。诶对,大家看到按这里边我定义的这个时间来看的话,这个好像挺大了,对吧,这个比方说这个四十七十二,这至少30多秒了啊,然后再往后看哦,这四十七零七是不是这也是乱序啊啊,这都有40秒延迟了啊,再往后看的话。哦,你看这里还有零零呢,对不对,呃,零五分零零秒啊,那这个这47秒,这还有五七,那差不多一分钟了,对吧?哎,所以要按这个,如果说我们想让它结果正确的话,是不是这里边就应该有一个一分钟的water mark的这个延迟时间啊,直接给一个time minutes1,然后诶这一步这就是我们前面数据就准备好了,对吧?然后后边是不是接下来就是开分组开窗聚合和下一步是不是就是排序输出啊对吧?呃,收集同一窗口。
16:26
Count数据排序输出啊,最后那当然就是打印输出,然后我们在env execute执行起来就完事了,对吧?啊,这就是我们整个这个代码的一个架构啊,这就是这个这样去写啊,那么这里边我们可以直接把这个具体的步骤也都先列出来啊,这个直接做这个分组开装聚合的时候,接下来data stream是不是直接就应该先做一个。先做一个KY对吧?KY之前大家还会想到之前我们不是那个要筛选行为是PV的那个行为吗?那现在我们也可以做一个筛选,因为这里面是不是有method的呀,那大家想到我这里边要看他那个访问信息的话,是不是应该只要message就够了,呃,就是method等于get的那个请求就够了,对吧?呃,就是直接get,就是每一次的访问嘛,就获取那个页面啊,所以我可以首先还是先做,先做一个filter。
17:25
里边是不是直接做一个对应的那个提取就完事了,哎,那有同学可能想到我可以去new一个filter function,那这里边我还是是不是可以直接做一个这个拉姆,呃,拉姆达表达式的一个书写啊,当前的data直接怎么样筛选?哎,对,直接请求方法是get,我们还是把它放在前面,然后equals当前data的是不是method呀,对吧,对应的这个method等于get就可以了,所以这里边我们做的是过滤。Get请求,然后下边是不是接下来就是KBY啊哎,分组按照什么分呢?哎,我们现在按的其实就是URL对不对,URL就相当于是我们现在的页面ID嘛,但是大家想到就是之前我们那个hot it呃,直接给这个字段命名了之后,后边是不是有一个不方便的地方,是它是一个temple类型啊,诶那他想我如果后边想要还想要拿到它的话,我能不把它定义成这个K定义成temple类型吗?
18:31
其实前面我们讲过的KY的时候是不是有不同的那个方法呀,重载的那个方法对吧,我只要怎么样就可以得到那个呃,K一定就是对应它的那个string类型呢,是不是只要写一个拉姆达表达式就可以了,就这里边传一个key select对吧,就可以直接写一个拉表达式啊,那大家还记得其实就是是不是要用它的点get URL是不是这样就可以了。
19:02
诶,当然这里边还有另外一种写法,大家还记得吗?是不是用方法引用啊,这里边我们要用的就是那个,呃,我们定义的是阿帕奇log event对吧,Log下边的。Get URL是不是直接传这个就可以了,所以这是我们直接按照URL分组,那在下边是不是就直接开窗就完事了,看window啊,那因为我们现在这个数据量其实整体来讲是比较小的啊,这里边的数据量这个只有我们看一眼。这里测试数据啊,只有只有1万条数据,所以这个时间整个跨度也不大,大家看这个这个到8.05啊,然后上面这个大概也就是呃。呃,这个是就是就是前一天五点十,呃,5月17号这个10.05到后边这个是就5月20号的这个,呃,9.05,晚上9.05,对吧?啊就是这个时间跨度也不是特别的大,所以这里边我们统计的呢,呃,我们就直接统计一个,呃一个小时,或者说统计更小一点,统计十分钟都可以对吧?我们这里边的需求是直接统计十分钟的窗口长度,统计它的那个流量,然后呢,呃,这个更新的频率要高一点,每隔五秒钟就更新一次,那所以接下来我这个当前的窗口就应该按照这个需求的话,是不是就是。
20:35
MINUTES10,然后滑动的步长就是五秒钟对吧?那有了窗口之后,接下来是不是就可以去直接做聚合了,那我们现在聚合是不是还是直接aggregate,给两个参数,一个是增量聚合函数对吧?一个aggregate function,我这个可以叫做配置count a j,然后后边是一个全窗口函数对吧?当前窗口的配count result。
21:07
哎,我就可以把这个先定义在这里啊,这个结果我们可以叫做window a j stream,这就是这一步,然后下边如果我们收集同一窗口的数据,要做排序输出的话,那应该是什么样,是不是直接基于前面的window a j stream,先按照当前的窗口去做一个K。那当前窗口是不是就是那个window呀,啊,那所以这里边我也可以直接写什么,是不是直接用那个方法引用就可以了,现在我要的是page view count。里边的get window and对不对,直接做这个操作啊,然后下边是不是直接process放大招啊,里边去做一个这样的一个处理啊,我把这个叫做top n hot pages,当然了,到底top几我也可以给一个字段对吧,比方说TOP3直接放在这就完事了啊,那这里边我可以把这个最终得到的结果叫做一个。
22:10
Data stream。String对吧,我们还是像前面的那个,呃,Top n一样啊,Top top n商品一样,呃,最后得到一个字符串类型,直接打印在控制台里面输出就完事了,这是我们想要的这个result string啊,这就是这样一个过程,当然这里大家看到,呃,当前我们这个还是还是有问题的,对吧?呃,就是这里边。我们做这个page view count的时候,它它这里边我们前面还没有得到具体的这个结果嘛,这里边本来应该得到结果应该是patri count,对吧,有了这个定义之后,我们这里边就可以直接从里边提取这个字段去定义了。最后当然是做一个打印输出result stream。Print就完事了,对吧,这就是整个这个代码的一个处理结构。
我来说两句