00:00
下来给大家讲第二大模块,就是所谓的实时流量统计这一部分,那这一部分呢,我们主要是有这样的几个不同的具体的需求,首先我们是要做一个基于服务器log的热门页面浏览量统计啊,当然这个浏览量统计大家直观想想到的应该是PVUV对吧?那这个我们放到后面去实现这两个需求,那这里边呢,首先给大家提出了一个基于服务器log的热门页面统计,那为什么要做这个事情呢?其实主要就是因为我我们现在做这个数据统计啊,做这个用户行为分析的时候,其实主要依赖的数据来源是什么?都是买点日志啊,就是我们都需要在这个前端或者后台打上这个,呃,买点的这个地方啊,把这个日志全部输出到当前的这个日志系统里边来,然后读取出来放到这个卡夫卡里边,接下来我们flink去消费,这是一个完整的流程,而但是在有一些这个应用场景里边。
01:00
那我们想要的数据可能根本就没有去买点,对吧?这跟业务系统有关,有可能那边开发的时候比较仓促,比较仓糙,有很多地方该收集的这个数据没有收集到,那对于我们大数据部门而言,没有业务系统那边的支持,没有买点日志,那怎么做呢?我们是不是就没有数据可用了呢?也不是,这里面有一个最基本的一定能够收集到的数据,那就是web server的这个服务器日志。啊,就是用户那边,既然我们是网络应用嘛,用户只要连上服务器,只要做过操作,只要产生过数据,那就肯定会有相关的一个日志出现,对吧?啊,它对于服务器的这个访问行为是跑不掉的,所以接下来我们可以用这种行为可以把它提炼出来,作为当前热门度浏览量的一个统计,那当前这个需求呢,又是一个热门页面的浏览量统计,那大家想到这不就跟前面我们讲的这个热门商品很类似嘛,对吧?那之前我们做这个热门商品统计的时候,也是把一个商品的浏览次数作为了一个热门度,那你现在要统计热门页面浏览量,而且还只有这个web server服务器的这个log,那我们不就统计每一个页面不是有URL的访问吗?那我们就统计当前的这个URL在日志里边被访问了多少次。
02:24
访问的次数就是它的热门度,然后我们统计出来之后,按照窗口排序,不就是当前每一个时间段内的热门页面嘛,啊,所以整体的流程跟之前非常的像,所以接下来我们就在代码里边把这一部分再做一个实现啊,那我们还是,呃,当前是一个新的模块了,所以直接去一个module对吧?呃,接下来我们这个。Mo,还是在当前的这一个下边,我们新建一个模块,叫做这个是网络流量相关的啊,呃,Network network network flow,呃,Network network flow,网络流量的一个统计分析,Analysis,好,先把这个模块创建出来,然后接下来在下边我们应该是有具体的几个不同的需求,我们就分成不同的object来做实现就可以了啊,那首先这里边我们会想到就是呃,还是啊,下面这个Java我们先改改一个名,改成这个scale,然后接下来我们想要用这个网络服务器的日志,那就不能用之前的那个user behavior了啊,那接下来我们应该收集一个网络日志啊,这个数据呢,是在这个data里边,大家看到有一个叫做。
03:41
Log,呃,叫做这个阿帕奇点log啊,这就是我们收集到的阿帕奇,呃,Web server对应的一个log日志啊,然后这里面大家看到这就是完全是最初的没有经过ETL之后的这个日志的状态,前面我们说呃对应的这是一个访问的IP,然后后边呢,我们是想要那个user ID和user name,这里面没有没有,那就是都是空了,这里边是一个横线表示,然后后再后边呢,是一个当前访问的时间,哎,这里是一个标准的这样的一个字符串显示的时间啊,我们还要去转成时间戳做一个转换,后面这个0000,这是时区了啊,我们这里边直接用的是零时区的一个数据,呃,如果是这个北京时间呢,可能是要加八对吧?呃,那另外后面还有两个字段,就是当前的请求方式,还有诶后面的这个就是URL对吧,所以我们主要是要按照什么去统计呢,就按照URL啊,看它出现几次去做一个统计就完事了啊。所以整体来。
04:41
来讲这个过程非常的简单,然后接下来我们就在代码里边去看一看啊,到底怎么样去写这个代码,还是skyla,下边去新建一个object,当前这个object我们也是加上包名,com.act硅谷点,当前这个是network flow,呃,然后另外这个analysis对吧,这是一个分析类统计分析类的指标,然后接下来我们当前这个就叫呃,Hot pages吧,对吧?呃,就是当前这个热门页面的一个流量统计对吧?啊,就是当前的这个,我们可以加上这个network flow啊好把把这个先创建出来,然后接下来整体的这个代码流程其实跟之前几乎完全一样,首先我们把对应的样例类定义出来,对吧,定义输入数据样例类。
05:41
啊,那首先这个p class,当前我把这个提取叫做一个阿帕奇,就跟那个PPT里边定义的一样,对吧,叫做阿帕奇log,其实就是一个访问访问数据啊呃,那这里边字段大家看到前面有一个IP对吧?Stream,然后呢,呃,后边是有这个user ID和username,就如果我们对user ID感兴趣的话,我把这个列出来,Username一般就就不管了,对吧?那就不要这个数据了啊,然后后边可以有一个对应的time step,哎,那这个我要一个长整型,那如果样例类里边定义了长整型的话,我们提取的时候就应该要有一个转换的过程,对吧?哎,你就一开始的那个时间是string,我要转换成长整型的时间戳后边,呃,我感兴趣的还有一个就是method当前的这个请求的这个方法对吧?还是这也是一个,最后还有一个URL,同样还是一个。
06:41
当前输入出去的样例类啊,另外当然了,我们中间既然是热门嘛,又是一个套盆,跟之前一样,我们先做开窗的聚合操作,得到聚合结果之后,再按照每一个窗口分组去排列,对吧?定义一个定时器,然后呃,收集到的数据之后,然后去把它做一个排序输出,所以整个的过程跟之前类似,我们这里边呢,定义一个呃窗口聚合结果样例类啊,那之前我们那个叫做item view count,那这里边我们就叫做URL view count吧,对吧,因为我这里边统计的这一个所谓的hots,其实就是统计的是它的这个URL,对吧,或者我叫配置。
07:26
Will count也是一样的,Will count,哎,那这里面我们定义的这个item ID对应之前的那个ID应该是什么呢?那应该就是URL了,就以URL作为页面的一个标志对吧?到到底我们看哪个页面它的一个URL被访问的次数多,本身它是一个string练习,后边还应该有一个window and一样的啊,要有这个window的标志长整型,另外还需要有一个count数量抗整型,先把它定义出来,然后后边这里边同样还是先定义这个流式处理环境SK stream execution environment get出来,然后当前我们当然环境里边的这个啊,我可以先把这个全局病都设成一,不影响正确性对吧,方便测试,然后呢,时间属性characteristic,还是时点时间对吧,因为这里边本身就带了这个时间戳嘛,如果说这个数据里面没有时间戳,那没办法,我只。
08:26
能用这个处理时间了,如果带了时间戳的话,我们做统计的时候,你分时间段去统计热门,当然是根据这个用户点击的时间来来统计啊,你如果这肯定要用这个我们当前运行的时间的话,这个热门时效性就已经没有了,对吧?所以肯定是用事件时间好,所以接下来我们把这个环境定义好了之后,接下来就是读取数据转换成样例类并呃这个提取时间戳和water,就生成watermark对吧?时间戳是提取出来的,Watermark是生成出来的。好那接下来我们就看这个处理的流程,还是先定义一个data stream,诶这里我直接从文件里面一下路径,呃,这个这个我们这个对吧,过来。
09:27
哦呃,我们前面这个还是先把它叫成这个input stream啊,后边这里边我们这个转换之后的这个叫stream啊,基于这个input stream,这是个stream类型的数据流做一个map转换,哎,这个操作其实类似啊,跟之前差不多对吧?哎,当前我们的这个data输入进来之后,哎,那应该是我要先把它拆分成一个啊瑞一个数组对吧?啊那就是每一个当前的data都可以做一个这个拆分,那当前我用的是什么来做拆分呢?哎,这个得是空格对吧?哎,当前这应该是空格去分割的,所以用空格分割,然后得到的这个数据呢,最后要保存成一个样例类阿帕奇log even,然后这里面有一个稍微麻烦一点的地方是本身的那个时间戳,这里边我们要提取出的这个时间字段,还要得做一个转换,对吧,那这个转换怎么转呢,就用。
10:26
Java里面不是有那个,呃,Date format那个类嘛,对吧,我们直接定义一个simple date format,把它的形式定义好,然后去做一个pass转换就可以了啊,所以这里边我们这样啊,对事件时间进行转换,得到时间戳好,那我先首先定义一个simple date date format对吧?拗一个出来simple form里边,这里边我们定义这里看格式了啊,我们看这里面的格式是什么呢?呃,这个格式啊,一五年的一个数据啊,大家看这个应该是日月年十分秒对吧?啊,前面是这个斜杠分割,后面是冒号分割,所以这里边我们定义的也是日月年,所以是DDMM,然后YYYY4位对吧,然后冒号分割,HHMMSS这样。
11:26
哪个式格式化,先定义好,然后接下来我定义一个当前的这个time stamp就等于啊,就或者我直接叫TS吧,TS就等于,哎,当前的这个simple data format,我去做一个pass操作,对吧,要pass哪个数据呢?当前要的是。应该是data里边的啊,就是我们已经那个split过了对吧?要这个array里边的第几项,我们得注意一下,看一下啊,空格分割之后,这个横线也算一个对吧?所以这是0123,现在要的是三,然后这个临时区,那我们就不考虑了,对吧?不考虑这个呃时区的偏移量了,所以这里边我们要这个ARRAY3把它拿出来得到当前pass出来之后,这个得到的是一个date类型对吧?那如果说我们要那个最后的那个时间戳的话,还得调它的get time方法,返回一个长整形的这个时间戳,对吧?而且大家注意啊,这个得到的就已经是1970年1月1号零点开始的一个毫秒数,所以这里边我们真的就是一个好秒数了,不像之前我们那个拿到的是一个秒对吧,这里已经是一个好秒述了,好啊,然后接下来做一个包装吧,里边的这个数据啊,首先这个array。
12:48
零这个是当前的第一个字段是那个IP对吧?然后后边还有这个第二个字段是UID,呃,但是这个UUID稍微麻烦一点,就是我们本来觉得uz ID应该是一个是个长整型,但这里边呢,我们考虑它这个有空的这种情况,所以我直接把它变成字符串了,对吧?就后面你如果假如有用的话,用到的话,你再去做转换,再去做判断,如果是横线的话啊,那相当于是空,如果是呃数字的话,我们再去把它转换成长景系,好当然当前的这个需求大家发现跟uz ID没关系,对吧?我关心的就像上一个需求,我们那个item做这个热门商品统计的时候,也是跟user user没关系啊,相当于只是统计那个item ID嘛,这里我们相当于只统计这个URL对吧?啊,所以这个其实当时不定义也是可以的,这里既然定义了,我把它提取出来吧,这个A瑞一对吧,然后接下来还有第第三个字段,这里边要的就是time step了。
13:48
我这里边就直接把转换好的这个TS放在这儿就完事了,对吧?然后下一个字段,第四个字段和第五个字段是message和URL,这里大家要看清楚啊,当前这个是三对吧,下面这个是四,所以后面是五和六,那这里边我们要定义的就是ARRAY5,这是method,然后ARRA6,这是最后的URL啊,就稍微仔细一点,把它做一个转换得到就可以了啊,当然这里面我们也是看到都只空了一格,对吧?所以这里边我不需要做那个tri啊,去掉空格的那种操作,如果有别的情况的话,那你做一些别的调整就OK。好,那接下来我们就是分配时间抽auto,对吧,之前我们是直接用了那个升序的分配方式,那现在到底用哪个呢?还是要看数据了。
14:37
们来看一当前这个数据的这个状态到底是怎么样的啊,那大家看这个零三秒,43秒,47秒,12秒,哎,那说明现在这个数据是乱的对吧?而且后面还来了零七秒,哎,那接下来我们既然是乱序的话,现在就应该给的是一个aign time Sam water marks,里边要给一个bonded auto waterness time sta stamp extra对吧?这长串,大家只要记得最前面的这几个单词就可以了,然后它自动给我们补全弹出来就完事,然后这里边有一个extra time Sam提取时间戳的方法,怎么提取呢?从数据里边提吗?我们当前不是已经有一个叫做time s的一个字段了吗?直接提取,哎,那有同学说后面还有那个乘1000呢,要不要乘1000?哎,大家注意你要不要乘1000,关键看当前的这个字段是秒还是毫秒,对吧,本身我们给的那个时间看起来是一个秒,精度到秒为止,但是呢,我们经过这样的一个。
15:38
这个simple date format转换之后,Pass之后在get time已经拿到的是毫秒值了,也就是后面如果是秒数的话,后面已经有000了,那你这个时候就不要乘1000了,对吧?啊,所以我们是根据那个真实的数据要转换成毫秒作为一个标准来给这个时间戳字段的啊,大家不要就是觉得,哎呀,他到底是什么时候成,什么时候不成呢?你关键看它是秒还是毫秒,然后还有一个地方,这里还报错,就是我们要传入一个延迟的时间,对吧?那这里边这个延迟时间到底该给多少呢?哎,这个就不好说,那我们说的是要给当前数据里边的,就是如果你想达到这个一个完美的处理,就是所有这个乱序数据都不出错的话,我们说就要给一个最大乱序程度,对吧,什么叫最大乱序程度呢?哎,就是我们说的,只要这个本来时间小的到了后边,时间大的在前边他们俩这。
16:38
这种就叫乱序,哎,那么它俩之间的那个时间差,这个就叫做乱序程度对吧?我们找一个最大的那个乱序程度,这里面就可以当成我们当前的这个,呃,就是延迟的这个时间,哎,那这里面我们这个最大乱值程度多少呢?哎,我们大概的看一看吧,因为这个数据尽管不太多,这也得有得有1万行数据对吧,大家看到有有1万条数据啊啊,那所以我不可能完整的全看一遍,或者说你就只能是跑一个程序去去检测了,对吧?啊,那如果说是实际应用场景下,你还不知道未来的这个数据到底乱到什么程度啊,所以说这个都没有办法特别的准,那我就大概的先,呃,就是大概的估估计一下,对吧,拍脑袋给一个值吧,那这里边我们大概看一下啊,47后边有12,哎,那这个乱序程度已经有35了,对吧,一减嘛,35,但是你看四后面还有零七,那你说47后面就相当于还有零七嘛,他俩也是乱序对不对,呃,也是乱序,那一减的话,这个已经到40了啊。
17:38
甚至你后面再看的话,还有57 57后边之后来了还有11对吧,哎这啊,甚至还有零零,所以这里边的这个乱序程度已经到了50多秒啊,所以就按照我们这里边看前十几条数据的话,我都可以有一个估计,大概这个整体的乱序程度在50多秒一分钟左右了,对吧?啊,那所以这里边,呃,如果你要是说,假如说我现在已经看过了,最大乱序程度就是一分钟,那这里边我就直接给一个time,引入这个time window in time.time对吧,直接给一个一分钟直接放在这。
18:17
啊啊,那那这样的就能处理我们当前的这个乱序程度,然后接下来就是基于它去做一些转换操作了,对吧?哎,这里边我们就是呃,进行开窗聚合,开窗聚合以及排序输出,我这个就不给大做这个分开的这个状态啊,还是给大家分开吧,因为这个分开可能看的会稍微的明显一点啊,我们还是定义一个a stream,然后呢,就基于前面的stream去做啊,首先我先做filter对吧,比方说我现在这个filter标准是什么呢?我只要所有的那个get请求对吧?呃,这里边因为所有的这个请求多了嘛,我们这里边可能需要的只是那个get啊,那这里边我定义出来就是当前的这个method必须得等于get。
19:17
呃,你如果要是说post对吧,那提交表单,或者说如果要有这个delete啊这些的请求的话,那显然就不应该是我们这里边访问流量的这个需求了啊啊,那然后接下来filter之后,那还是KBY对吧,按照字段去做分组,按照哪个字段去做分组呢?哎,大家想到肯定是URL嘛,啊然后接下来那分组之后就开窗,Time time window对吧?那这里边我们要统计,你就看它的需求到底是统计多长时间范围内的数据,然后再看要有多多快的这个输出频率,对吧?啊,这里边因为我们的数据比较少,所以大家看到当前我们的这个需求呢,是最近十分钟的数据,然后我们来,呃,就是统计它这个访问量最多的top n,然后每隔五秒大家看这个输出频率就很高了,对吧,每隔五秒就输出一次,那如果要是这么定义的话,这里边这个还是滑动窗口吗?它。
20:17
主卧输出的这个频率高一点而已,它的本身的这个时间窗口的长度是十分钟,然后后边呢,滑动的这个距离是五秒钟,SECONDS5对吧,把它定义出来,然后接下来呢,同样还是aggregate,先做一个豫句和后边再去做一个啊window的那个合并window之后包装成幺零类的那个操作,对吧?啊,大家看这个流程就都一样嘛,那浴具盒我们这个就叫做配置,呃,Count a j对吧,做一个聚合,然后后面那个我们就叫will count window result,诶,把它定义出来,这是我们自定义的两个函数啊,那接下来我们做一个实现啊,这个跟之前几乎完全一样,我们几乎可以直接照抄CE一个aggregate方式。
21:14
方式啊,注意引入的时候要这个啊,就是我们flink common里边的这个ATE方式对吧?实现一个Java interface,然后里边的数据类型呢?诶,那我们这里边有些同学可能说,诶,我直接用之前定义好那个count a行吗?之前不是已经定义好了吗?我放在一个那个公共的包里边,对吧?我直接引用这这个难道不不简单一些吗?你每次都要重新定义吗?哎,确实还真是的,因为后边的类型不一样,对吧?你当前的输入数据类型是不是已经变成我们当前的这个叫做阿帕奇event啊,阿帕奇event log这样的一个样例类类型了,对吧?啊,你后边的这个状态的话,当然还是长整形统计一个count值嘛,那这个还是不变啊,我们把它定义出来,同样里边的过程创建的时候0L对吧,每来一条数据accumul就加一啊,然后最后得到什么呢?最后得到一个accumul啊,这里边要合并的话A加B对吧,这过程完全一样,所以。
22:14
大家感觉好像是可以合并啊,但是因为它的这个发型不一样,你还是得专门做一个实现的啊啊,所以这个就看大家的需求了啊,你如果觉得这个也很简单,快速的这么敲一下就就搞定了,那其实也没问题啊,然后接下来我们把后面的那个window function也来做一个实现啊,呃,Page count result这里面实现的是一个window function。哎,注意这里边啊,我们引入的时候呢,大家注意这个window必须是stream API下面的下的window,它本身是一个s sc tra对吧?这个大家不要引错,然后里边的这个类型呢?呃,这就是我们说的输入输出,还有那个key的类型,还有呃,Window的类型对吧?当前的输入就是前面预计和结果的一个输出,然后接下来呢,它的输出又是什么呢?就是我们的样例类的那个类型,对吧?配review count的这个类型,最后还有这个K的类型,K的类型跟大家发现了,你如果这里边直接用这个URL这样字符串去定义的话,这里边得到的还是一个那个抓va的元组类型,对吧?哎,之前我们做那个热门商品统计的时候,大家可能就觉得深受其害,为什么呢?后面这个提取当前这个item ID贼麻烦了,对吧,我觉得这个本来就是当前K就是item ID,结果提取的时候呢,还绕了半天,那有没有更。
23:39
简单的方式呢,哎,其实是有的啊,大家记得之前K的时候,你可以有不同的选择呀,你可以传一个这个字段名称,也可以传一个位置啊,这个传位置的话,我们说本身它这个语义表示啊,这个代码的可读性比较差,而且它得到的结果也是一个抓va元组,对吧?啊这个是没办法的,那什么情况下我可以它本身那个K是什么类型就返回什么类型呢?哎,那就是我们说的,你自己去定义一个K的这个选择器,定义一个函数得到的这个类型不就变成K了吗?大家看这个,呃,K的string的类型就变成K了K了,对吧,这就是你是什么类型就就是什么类型啊,这个也比较好理解,就是我们前面说的,它本身是可以传多个这个字段作为K的,那当然要包成一个元组了,你现在是已经定义死了,就这一个,或者说就这呃一组对吧,你可以定义这个元组类型,那在这里边,当然这个类型就已经确定了啊,所以这里边我们可以把它上面做一个调整那。
24:39
的后边调用方便,这里边我们直接把这个URL用下划线的这种方式传入,可读性也很强,对吧?啊,因为本来是这个样例类嘛,怎么写都是可以的,那这里边大家看我的类型,就不要再去写Java元组了,而是直接写URL的类型string,然后另外还有这个time window对吧,窗口的类型写出来,大家看这个一写完之后,如果类型匹配的话,上边这里就不会再报错了。
25:06
啊,然后接下来我们就是要实现一个apply方法,这个apply方法里边也不需要做什么操作,我就是要包装成一个page view count做一个输出,那这个输出的时候大家注意是不是直接返回对吧?Apply方法并没有返回值,我要用out.collect这个方法去做一个返回,里边包一个配置u count,那里边呢,第一个是当前的URL,类似于ID对吧?URL不就是当前的key吗?哎,所以这里面URL直接把P放在这完事了,对吧?哎,大家看这里它本身就是词string类型嘛,你直接放在这一点问题都没有对不对?哎,所以这个就使用起来就方便多了啊,然后后边是那个window end window and,那直接用这个window.get end拿到对吧?第三个参数input里边不是有我们那个当前迭代器,第一个值拿到,就是当前的那个值嘛,哎,所以直接包在这儿就搞定了,这就是我们这个前面做聚合的一个过程那。
我来说两句