00:00
好,接下来我们讲第二个大的拈,这一个大拈呢,我们管它叫做实时的流量统计,那么实时流量统计这一部分呢,首先我们要讲的是一个基于服务器log的热门页面流量统计,呃,这个这个主要是基于什么样的一种场景呢?啊,那大家就会想到我们前面分析的数据来源是。呃,U德黑尔主要是说是基于买点日志对吧?啊,但是有时候大家会发现。首先买点日志可能不全,有些数据你就没有买点,没有做收集,另外还有一个是那对于有一些就是电商平台而言,有时候要统计的是什么呢?是热门的一些站点,热门的一些。呃,这个链接热门的一些页面,它的这个浏览量对不对,那统计这些数据的时候,假如说我们就没办法直接获取到,在这个买点日志里面去获取这些信息的话,那我们还有什么手段能够拿到这些信息呢?
01:05
一个对比较常见的一种分析的手段,就是可以直接从web服务器的日志里边把它提取出来。这一部分,呃,那那其实大家会想到也比较简单了,就是直接分析那个服务器的log,然后把我们想要的什么字段拿出来呢?假如这里边我们需要的是统计热门页面的浏览数。那我们是不是就是要把还是要把跟页面相关的那个信息拿出来,那在web服务器日志里边,什么代表一个页面呢?是不是就是它的URL啊,啊,所以在这这个拈的实现里边,其实跟前面热门商品非常的类似,我们其实就是把之前的商品替换成URL去做统计就可以了啊,所以呃,接下来我们这个具体的需求是统计最近十分钟,或者说呃,这个看我们自己定义啊,或者一分钟或者是一个小时之内的访问量最多的N个URL,然后我们每隔五秒,或者说每隔一秒就输出一次,那这就相当于是一个实时刷新的热门URL的一个一个统计,对不对?呃,那我们就要构建一个滑动窗口去做这件事情。
02:24
所以大家看这个过程跟之前就完全一样啊,接下来我们在代码里边直接去做一个实现,相当于是对之前学习的内容做一个复习了啊。去new一个新的模块,当前这个模块叫做呃,Network flow网络流量分析。呃,这个拈里边泡文件,我们需要加入什么新的依赖吗?好像想不到对吧?啊,那没关系,我们先把它放着,如果需要的话,到时候我们再说,然后接下来同样代码还是应该放在southw下面,改个名吧,LA。
03:09
呃,那我们现在resources下面放什么东西呢?对,我们要把那个对应的数据还是放在这里,对应的数据叫做阿帕奇点log。复制过来。大家首先看一眼这个数据结构是什么样子的啊,大概的情况就是哦,最后这是一个URL对不对,然后前边这是呃一个IP,然后后边这两个这个空的字段本来应该是这个user ID和username啊,但是这里边我们就拿不到,拿不到那就算了,对不对?呃,就是如果能拿到的话,我们是不是直接基于那个呃uz ID就直接也可以做驱虫去算那个UV了,对吧?呃,但这里边拿不到,那就没有办法,然后后边。哎,这里面是不是还有这个时间啊,后边这个是时区的那个偏移,所以这两个字段合起来表示我们的时间,然后后边还有就是请求的方法get post对吧,所以这里边常规的字段就是这几个。
04:12
然后接下来我们代码要放在skyla下边去new一个object。带上包,名叫com点硅谷点network flow。Analysis。名字我们就叫network吧,呃,Network吧。Main函数,呃,首先我们还是一般的这种习惯啊,先把想要定义的样衣类先放在外边,呃,首先我们会想到输入数据流应该有一个样例类,对不对?样例类case plus,这个流就叫做直接叫做阿帕奇log event。
05:03
一个日件里面表示的一个事件对不对?日志里边表示的一个事件,这里边的这个事件包含的信息有,首先是一个IP对吧,String,呃,然后后边user ID。UID应该,呃,这里边如果它本身要是那个空的话,那我们可能就还是用一个string去表示,对吧,直接拿string来表达。Event time还有发生的时间。它本身也是一个string,我们这里面是不是希望把它要转成一个,转成一个时间戳一个long类型对不对,所以这里定义的时候,我直接把它定义成浪就好了,到时候包装成样例类的时候,是不是提前就先做一个转换啊啊,这里是大家需要注意的啊,然后接下来还有method的请求的方法啊。最后还有URL也是一个死据啊,这是我们预先定义好的格式,另外我们既然要输出这个实实时的输出top n热门的呃,浏览页面,那么中间是不是也要,如果用我们之前那个思路的话,中间是不是也是先开窗,先统计每一个页面的浏览量,然后再根据不同的窗口去做排序输出套喷啊,那就还是这个思路,那是不是在中间的过程当中,一开始我们做窗口聚合的时候,还是要输出一个带着window and,带着统计出来的那个技术量的这样的一个数据结构,就之前我们叫item will count的那个数据结构,对吧?这里边我们同样定义一个窗口聚合样机类。
06:49
聚合结果两离类。啊,那当然了,这个不叫item view count,我们叫URL view count吧。
07:02
几乎跟之前完全一样,只不过这里不是item ID了,我们对直接就用URL做它这里边的区分,另外还需要有window and。Long类型,另外还有一个count对吧?啊,这个我们也直接给一个浪类型就可以了。好,呃,那在里边的这个主主函数里边的基本流程,那其实完全一样对不对,Env先创建环境STEM execution。Environment get,我们把这个先引入。然后还是,呃,我是不是可以把并行度线设成一,然后要设置对事件时间这个时间特性,对吧?Characteristic even啊,先把它说好,同样后边我们接下来是要读取数据源对不对?它是从哪里去读,对文件里面去读read the text file,阿帕奇log,把这个路径传进来。
08:15
然后接下来是不是先要map成我们对应的那个对样例类啊,我们这里定义一个瑞,先把它切开。这里边是split,根据什么字段去切呢?然后这里大家得看一下具体的这个数据了,这里边是空格去做切分的啊,所以这里边切开之后包装成阿帕奇event log log event。那这首先是data RA 0train,呃,当然train之后是不是就是一个string啊,啊,所以这个就不用管了,然后接下来第二个字段是data ra1user ID还是一个string,那还是不用管,对不对?诶接下来even time even time是不是就得做一个转换了,大家想一想是不是,那么这个东西怎么去转换呢?那是不是我们得定义一个data form matter,呃,Data format对吧?按照我们这里边本身的这个结构定义好,然后做一个转换,转换成我们想要的那个时间戳。
09:28
啊,所以关于这个date的操作,大家应该也比较熟悉对吧?数据定义,呃,数据好像多一个啊,那我们是说这里边会有一个那个user,呃,Username对吧,那这里边反正都是空,我们直接不要这个数据了,这个没关系对不对,大家看这里边是0123456对吧,一共七个字段,七个数据,那这里边可能我们要的就是零一。
10:03
三四也不要,呃,这个时区0000,那我们都不管了,对吧,呃五六对吧,就要这几个字段就可以,所以这里边是只定义了五个字段啊,是这样的啊,所以这里边我们要定义时间转换。呃,这里边simple date format对吧,你有一个simple date format,这里面我们要定义一个什么样的格式呢?看一眼。呃,它的这个写法应该是这是日月年十分秒对吧?呃,然后前面是这个斜杠,后边是冒号,所以这里边就是日day DD对吧,MM年YYYY后后边都是冒号吧,后边都是冒号十分秒,是不是这样的一个格式啊,我们把它先定义好。
11:11
然后接下来定义这个time step,就应该是用simple data format去做一个pass我们的数据,对吧,这里边的数据要传的是data read,这个是几呢?0123对吧,要穿三。呃,当然这里边把它先做一个trip,然后传进来就可以了啊,当然这里边pass之后得到的是一个date数据类型,如果我们想要得到长整形的那个时间戳的话,还需要是不是有一个方法叫get get time啊,这个方法是不是得到的就是当前的那个毫秒数,这个时间初21970年1月1号开始的那个,对吧?啊,所以我们直接调这个get time方法就可以了。
12:06
拿到了这个时间戳之后,接下来就把它做一个包装,第三个字段,Even time,对吧。Time stamp直接传进来。然后第四个字段是method method的话是datarra的十七零一二三四五,哦,就五跟六了是吧,所以这里边是五。String类型data ra6却最后一个URL,大家看这样就把它包装好了,呃,稍微麻烦一点,就是要把这个时间做一个转换,这也是我们从日志里边提取的过程当中比较常见的一个,呃,一个方法啊,经常要这么去做,然后接下来我是不是要去指定数据里边的时间戳和watermark了。哎,这个指定的过程当中,之前我们是直接用了ending time stamps,那我们这里边也是直接ending time stamps就可以了吗?
13:08
这个是不是当时我们说是要看数据的状态啊,对吧,这个是根据数据来的,并不是我们拍脑袋定的,我们看一看吧,这里边的这个数据零三秒43秒,它有乱序吗?诶大家一看这里是不是有乱序啊,而且这个乱的有点大啊,这个几十秒的这个乱序,这个确实有点乱,这个数据是本本来是很乱的一个状态啊,那我们看一看,这里边如果是乱序数据的话,就不能直接asending。就是直接用我们那个生序的那个生成automa的提取时间戳生成automa的那个方法了,对吧?这里边我们要去用assign是不是对time stamps and water marks这个方法,一般的乱序我们就用对bounded,大家还记得那个东西吧,Bonded out of order is就是这个对吧?对,呃,它继承了一个周期性生成water mark的那个方法,对不对?默认的周期是200毫秒,对,如果说想设的话,我们也可以EV里面去set,对吧?啊,这里边我们就直接用默认的就好。
14:22
首先这里边得定义一下到底用哪个字段去提取时间戳,那这个字段是不是element里边的,诶是哪个?对,是even type,这里不是我们之前定义那个time stamp,是不是要用这个样例类里边的字段啊,Even type,诶那后面有同学说还得乘以1000嘛,对吧?这里要不要乘以1000?这里大家要注意,乘不乘1000是我们要判断实际这里边传进来的这个数据。是秒还是毫秒对吧?呃,因为flink本身是在这一点上是很傻的,它只认这是一个长整形数,你是多少,我就把它当成毫秒来处理,对不对?那我们自己得判断,到底这个应该是秒还是毫秒,如果是秒乘以1000。
15:09
毫秒是不是就不要乘1000啊,这里是秒还是毫秒?对,当时大家看到了这里get time是不是返回的是毫秒数啊,对,所以这里边就不需要再乘1000了,哎,这是提取时间戳,还没完。对周期性生成这个乱要处理乱序数据是不是有一个延迟时间啊,这个时间对一个time,那这个给多少呢。还是啊,大家会想到这个它至少是秒级的了,对吧,那大家看一下这个乱是不是要跟大概的这个乱序的程度相关啊,要有这样的一个考量对吧?啊,那所以大家说我们这个例子里边给多少吧?呃,大家可能大概的看一看,这就只能按经验来来考察了,对不对,因为数据比较多的话,这个我们可能不可能一眼看到它里边最大应该给多少,这这你就得按经验或者说呃,先设定一个值,然后后续如果发现那个处理比较多,你可能就是一方面这里边设一个automark最大的延迟,另外是不是还应该把那个延迟数据做一些其他的处理啊,对吧,然后随时的去比对,看看是不是总是有很多迟到的数据没有处理,假如说总是很多的话,那是不是我们就应该调整,对,你假如可以容忍这样的一个延迟输出的话,那我就把这个前面最大的延迟时间再调大一点,对吧?啊,那那如。
16:38
果说你要不能忍的话,要实时性更高,那是不是应该把前面的这一个迟到的时间调小,但是在后面要做对应的那个迟到数据处理啊,这是大家能够想到的这种处理的过程啊呃,那这里边我们就就直接就假如说我们对那个实时性啊,延迟要求没那么高,我们就想把它这个正确性处理好,那大家大概看一下这个得给多少延迟啊,有同学说可能得差不多给到一分钟了,是吧,我们看看。
17:13
哦,大概看的话,这里边50已经来了,这里边后边还有还有11对吧,这是不是至少有有40秒啊,四到前面这个还有57呢,所以这个基本上就50多秒对吧?啊,所以几乎得给到一分钟的延迟,当然如果实际生产过程当中不应该有这么高的延迟,对吧?如果要是这么高延迟,我这里边是不是应该本身调小一点,先输出一个近似的结果,后边再去调整对吧?哎,应该是有这样的一个处理的过程的,那大家会想到这里边比方说呃,我如果要是本身这个延迟不能太大的话,我应该怎么样呢?是不是本身可以这里比方说我给一个一秒或者两秒的延迟,那然后后边可以做什么事情,对后边可以再去窗口那个地方lo lateness去做额外的处理,对吧?如果那个要是还有处理不完的,我是不是还可以再set out再去做单独处理啊对吧?好,这里边我们就这里先时效性先高一点,对吧,要不然你这个本来比方说人家那个划窗就是五秒钟滑一次就要输出结果你是要要等一分钟才能把之前的数据收集起来,这个延迟确实有点太高了。
18:33
然后这里边接下来我们就去做KBYK什么呢。常规操作啊,当前我们K是吗?IP,有些同学说,诶,我是根据IP去做去做KY,我现在是要统计什么的数量,哎,对,是不是要统计UIL的数量啊,所以对应我们上一个例子,热门商品的统计,它是不是要以商品IDK100做分组,然后统计数量啊,那现在我们要统计URL的数量,那以什么分组呢?对,是不是要以UIL分组啊,对吧?你如果以IP分组的话,统计的是同一个IP。
19:16
它发出的请求的数量对不对,这并不代表一个热门,我们热门是同一个URL有多少有多少IP,或者说有多少次那个调用对吧,多少次请求,所以这里边我们其实就直接以这个URL作为K去做处理就可以了,然后是不是开时间窗口啊开对吧,比方说这里边我们的窗口是统计十分钟的,那是MINUTES10。划窗那是TIME1,呃,我们说是几秒钟划一次,五秒钟划一次是吧?对,所以是五对吧。那当然了,后边还需要有什么对,是不是lateness啊对吧,这里边允许,呃,这里面还得是一个time,所以time.SECOND60对,允许60秒的迟到数据去更新它。
20:16
然后接下来。接下来做什么?已经定义的窗口已经有了signner,知道它是属于对要聚合,那我们是不是还是用一个aggregate定义出来,里边是不是还是可以定义自定义的预聚合函数,定义自定义的窗口函数,对吧?来一个处理一个,来一个处理一个,聚合好了之后,最后包装成一个统一的URL count输出就完事了,哎,跟之前那个非常类似啊,所以这里边我还是定义一个count agg,然后你有一个window result啊,那当然有同学说,诶,这里边我们这两个东西是不是可以复用之前的东西呢?啊,当然如果你要是把它抽取出来的话,可以放到一个一个公共类里边去做复用,但这里边这还不能直接复用,为什么呢?
21:10
这里面的逻辑很类似。但是。是不是类数据类型不一样啊,数据类型不一样的话,那你实现的过程当中,相当于你这个类型还是一个对,还是一个参数了,对吧?啊,这个可能就比较麻烦,我们这不同的包嘛,你就不要互相有依赖了,我们就直接在外面去实现吧,比方说这里边我们要实现。Count a j。这里要继承的是一个什么,一个aggre,不,不是继承要实现一个aggregate function这样的一个接口,它里边是要传的类型是input的类型,数据类型,状态,累加器的类型,对吧,就是状态,最后还有输出的类型,注意它的输出其实是对下一个我们那个window function的输入,对吧?所以这里边同样这里边输入,输入是阿帕奇event long event对吧,累加器是不是long类型啊,输出是不是也是long类型,对,其实是这样的一个状态。
22:20
然后我们实现一下。同样是不是还是来一个数累加一次啊,就是一个简单的计数器accumulate加一,然后下面创建的时候,初始值是什么呢?初始值零,然后get result,这时返回它的结果的时候是要什么呢?就是当前的计数器返回对吧?呃,然后最后如果要是这个分区合并的时候,A加B就是这么简单对吧。然后接下来还是啊plus window,他要去实现的是一个window。
23:02
Window,什么R,呃,R window function对吧?是不是就是这样一个东西,呃,这里面它有四个类型要传入一个in,一个out,然后还有一个key,还有一个window的类型,所以这里边它的输入是什么呢?对,输入是前面语句和函数的输出long,它的输出是URL压力类,对吧,You count,然后后边K的类型是什么呢?哎,K的类型这个很好,我们当时是不是直接用这个呃,Select对吧?这个下划线的方式的话,这里边本身那个字段是什么类型,是不是这里边K就是什么类型,URL的类型本身就是string,所以这里边给一个string window的类型是不是time window啊?好,大家看把这个哦,当然这里面还得引入啊。
24:02
把这个一写上面,是不是就不报错了。所以接下来。这里边我们要输出什么呢?当然这里边要必须实现一个apply方法,这里边我们想输出,那得out.PL去输出,包装成一个URL will count里边传什么呢?首先是当前的URLURL在哪里边,是不是就是key啊,对啊,KB就是用URL key bed的嘛,然后接下来是window and window and是不是要从window里面拿啊,window.get and,最后还有一个count count是不是就是在这里的input里面,Input本身是一个迭代器,对吧?Next取出来就可以了啊,这就是我们前面做这个窗口聚合的过程。
我来说两句