00:00
接下来我们再来讨论一下进行实时流量统计的另外一部分内容,那我们接下来呢,之前我们做的是基于服务器web server它的那个log来去做这个流量统计,而且我们做的是一个实时的呃,热门页面流量的计,那这个就非常类似于之前的那个呃,Top n啊,就热门商品的统计,那我们想到平常遇到的需求呢,往往不需要去统计这个数据,那我们一般用到的呢,是统计网站的总浏览量对吧,就是隔一段时间去统计一次,比方说一天之内呢总访问量啊,一个小时之内的总访问量啊,这就是我们最为经典的指标统计指标PVUV啊,那接下来呢,我们的这个需求就是给大家来讲讲这个PVUV的实现啊,那首先我们来讲PV,而且我们现在这个数据呢,还是。回归到我们之前的那个买点数据了,对吧,因为如果要是呃,我们有这个买点日志数据的话,用它显然要比之前我们从那个web server里面提取那个数据啊,你看那个URL都不太好使嘛,很多这个你要写复杂的这个正则去做一些资源文件的过滤,那里边有一些呢,可能滤出来之后也不一定就是我们的那个页面的URL啊,所以这个还是比较比较粗糙的一个统计,更精确的统计,基于买点日志。
01:26
啊,那首先就是PV了,PV大家都很熟悉啊,它其实就是pri的简称,就是所谓的整个网站的页面浏览量啊,那它统计的这个原理就是什么呢?只要有一次PV行为,不管是相同用户的还是不同用户的,都算作一次PV的统计,对吧?啊所以简单来讲的话,我们这就是一个count,就是对于这个PV行为所有数据量的一个count啊那当然它可能有一些缺陷,就是我们说的,假如说一个用户他就在那里不停的反复的打开一个界面,那就相当于他可以刷出很高的这个PV值,呃,所以在有些场景下呢,呃,这个PV我们要结合后面讲到的这个UV一起来看,对吧?啊作为这个网站访问量的一个代表,那首先我们先来看这个P怎么去做啊,那PV的做法其实呃,大家会发现,就是说它主要基于的原理啊,还是就是说用户每一次访问页面的时候会发。
02:26
一个请求对吧,就是所以如果说我们没有买点日志的话,你直接在前面我们的那个网络日志里面啊,就网络,呃,Web server啊,Web服务器的那个日志里边去提取这个PV行为去统计也是一样的,对吧,你就统计个数就完事了,呃,那另外如果说现在我们已经有了这个买点日志,比方说我们还是用user behavior这个数据作为数据源,哎,那我们就可以。就选取里边的那个PV行为,对吧,大家还记得里边有那个PV行为,之前我们是拿它来作为商品的热门度了,那现在呢,我们就把它作为当前网站浏览量的访问行为,对吧?来用它来做这个PV的统计啊,那我们现在呢,统计这个是每小时内的网站的PV啊,那所以接下来我们还是在代码里边。
03:17
新建一个,就在当前的模块下新建一个object。这个我们这个。然后里边的具体流程,那其实整体都大同小异了啊,大家想到这里边一开始那肯定还是呃,Stream execution environment,先把执行环境获取到,我们还是把这个影视转换引入进来啊,然后呢,我们还是啊,把这个当前的这个时间特性创建出来,定义出来,一般情况下既然是user behavior嘛,我们当前应该是有这个英文time对吧,直接把它提取出来,然后另外还有就是这个并行度,如果说我们想看的这个更直观一点的话,全局并行度直接设成一啊,这都是完全没有问题的啊,然后后边这个操作大家想到其实跟我们之前那个一样,对吧,先把这个数据全都读出来啊,然后我当前干脆把这个大家知道,就是具体要我们提交这个就是打包的时候啊,最好还是在当前的这个目录下边有对应的这一个资源文件,对吧,所以说我还是把它copy过来吧,尽管说这个文件有点大啊,我们还是把copy过来啊,然后我。
04:33
在这里边引入当前这个文件的时候呢,我就直接就是大家知道就可以写这个相对路径了,对吧?呃,如果要是你你这里边,呃,像前面我们那个直接写绝对路径的方式,其实在就是大家可以看一下我们之回顾一下之前那个耗台这里边啊,这里边其实还是有有很多东西,我们后面是改成那个卡夫卡源了嘛,所以说这个问题不大,你像前面我们直接写这个读取数据源的时候,写这个的话,这种方式其实并不好,对吧,这种方式你直接写死这个本地地盘的什么样的一个目录,那我具体提交到这个执行环境肯定是有问题的,所以这里边我可以做一个操作,就是把当前的呃,就是这个目录啊,我只定义一个,只定义一个这个相对路径就可以了啊,就比方说我定义一个这个,呃,啊,我们定义这样一个。
05:33
Resource,我可以用get class,这里边有一个方法叫做get resource对吧,那就是当前类,然后所在的这个包环境里边,我对应的那个resource都可以直接用这个方法拿出来,然后这里边呢,我直接传一个相对路径就可以,哎,我直接前面加一个这个斜杠,然后我把当前的这个user behavior。Copy它的这个相对路径啊,其实就是他自己的这个文件名了,对吧,我们直接就在这个resources目录下边嘛,所以这个其实是没什么问题的,那后边我们做这个。
06:07
我就把这个直接放大一点啊,后边我们在引入的时候就不要直接写死这里的这一个路径了,而是用这个resource.get pass,把它的路径拿出来读取数据就完事了啊,这样的话会更通用一点啊,这是从文件中读取数据啊,那另外就是说像前面我们曾经定义好的这个样例类对吧?啊,那该该引入还是引入啊,这里边我也可以把这个就是假如说我用到的数据都一样啊,那我可以把它放在一个公共的这个包里边去调用啊,那这里边因为我们真实的场景并不是说就是所有数据啊,你统计的时候来了之后都是都是一样的格式,所以我还是单独在自己这个包里边把它定义一下吧,啊,把它定义出来user behavior,然后另外还有就是做这个转换操作,大家想到也是差不多的,对吧,这个数据读进来之后mapb成user behavior样例类然。
07:07
然后定义当前的这个时间戳升序的,我们直接把它这个升序定义出来就完事了,乘以1000对吧,毫秒数,哎,所以整体的这个前面的流程都非常的类似啊,这这里边你看它,呃,就是看我们要不要引入之前那个hot台it里边的,我这里面自己已经定义了,当然就不需要了,对吧?啊,然后这一步是转换成转换成样例类类型并提取。时间戳和water,然后接下来我们要做的操作,那就是真正的在它基础上去做一个处理啦,处理转换对吧,做这个PV,哎,那我这里边定义一个,比方说我最后要得到一个就是PV streamam,对吧?就最后我要看到当前的这个PV,每个小时输出的那个结果,对吧?啊,整体来讲就是一个小时一个一个窗口,然后有一个输出的统计结果,就在这个流里边转换之后直接输出,那我基于前面的data stream首先要做什么事呢?是不是先得还是先得做一个filter,先filter出所有那个P行为对吧?哎,我那个等于P把这个先拿出来,然后接下来大家想到,哎,那我是不是要去做一个失呢?是不是要做分组呢?
08:36
诶,这里我们会发现好像不需要分组对吧?诶,因为我们当前就是全网站的这个,呃,访问量嘛,你不管是哪个呃,User不管是哪个item,这里边都是一次访问,而且我们是不区分这个相同user和相同item的嘛,所有的只要来一个我就统计加一,诶所以我就发现了,这里面好像不需要不需要这个分组,但是不需,如果不分组的话,后面我们做那个直接开窗,我们不是说那个就得WINDOW2了吗?哎,这个就有有点麻烦,那怎么样做这个操作呢?啊,一个简单的想法是我可以就像之前做world count一样,大家还记得word count对吧?当时word count我就是每来一个数据,我都把它卖成一个二元组,那现在也是一样,我现在每一个数据我把它卖成一个二元组,然后前面的这个K是什么呢?哎,K我随便给,甚至我可以给一个比方说,所有的数据来了之后,它都叫PV,这行不行。
09:35
哎,这也行啊,就相当于大家全分到同一组了,对吧?啊,同一个K,这相当于就是一个我们说的这个雅K对吧?给大家写出来啊,定义一个PV字段,呃,就是字符串作为分组的,呃,这个雅K啊,这个翻译过来啊,大米K啊,所以接下来我们要去做这个开窗操作呢,还是先K对吧?那这里面K当然就是二元组里面的第一个元素嘛,K_一,大家要注意一下,当前所有数据会被分到同一个组,因为我们当前的K一样啊啊,那接下来我们继续就是同样的这个开窗操作了,我想统计每一个每一小时之内的PV值,那当然还是引入。
10:35
这个点直接定义一个时间长度,一小时一个数就表示当前是一个滚动窗口,对吧,一动窗动窗来我们合,哎,那聚合的时候我们可能还是想到啊呃,最后我想要输出一个什么结果呢?可能想要看到一个就是带着当前窗口值口。
11:06
它的那个结束时间对吧,我们叫window end啊,带着窗口信息的一个,呃,当前的一个那个PV数量的一个统计,所以我还是定义一个输出的样例类吧,就是我们定义输入输出啊,定义输出PV统计的样例类,那这个样例类里边,诶大家也会想到,其实这个很简单,我们就叫做PV count,或者你叫PV count view也可以,对吧,像我们之前呃定义的那种模式啊,啊,那这这个样一类,其实我们关心的就是什么呢?也没有什么ID之类的,对吧?我关心的就是每一个window window end,这是一个长整性,然后另外还有一个当前的PV count数量,所以接下来我这里边要做转换,既然是又要用到window信息,那是不是必须得上window function啊,得有窗口函数,对吧?但是呢,我又想要来一个统计一个啊,大家想到这个你PV。
12:06
统计,这不就是来一个加一来一个加一吗?哎,所以这里边我还是用之前的方法aggregate,对吧?前面来一个比方说我这个PV,呃,Count a做一个增量的预聚和统计,后边呢,来一个PV count window result包装成我们想要的样例类类型输出到这儿就结束了,后边不用做任何的排序操作了,对吧,只要把这个输出我们就看到结果了啊,所以这个整体来讲操作还是比较简单的啊PV stream,呃,接下来把它做一个print输出,最后不要忘记还有这个en nv execu,执行起来当前是PV job,好,那后边当然就是关键在于怎么样实现这两个自定义的聚合函数和窗口函数了,对吧?这是自定义语句和函数,大家现在应该是轻车熟路啊,我直接这个PV抗AJ,它需要去实现的。
13:06
是一个re,好,然后后边的类型大家注意啊,就是当前的入数据样例类user behavior对吧?哎,然后中间的状态和输出的结果,因为最后只要一个count值就够了对吧?传递给那个window function,所以这里边我们还是长形长形对吧?里边的实线大家都已经非常熟悉啊,初始值来一个零,然后这里边A每来一个数据的时候直接加1ACCUMULATOR加一,最后输出的给的就是当前统计的结果对吧?A emerge的时候A加B就完事了哦,这就是非踌速的实现了一个预聚合对吧?然后另外还有自定义窗口函数,它是拿到预计和结果之后才做的这个处理,对吧?哎,所以说这里边我们实现这个window function,大家要注意引入的是这个,呃。
14:06
不是下面这个啊,下面这个是Java的interface对吧?我们要的是上面的这个,然后里边的数据类型,它输入的就是预据和函数的输出,哎,当前的这个长整型count值,它的输出是我们定义好的那个样例类TV count,然后接下来还有这个当前K的值,我们这个K的值是什么呢?啊,下划线一这个K不就是PV吗?那那下划线一这还是一个直接定义好的一个函数提取对吧?啊,提取出字段,那是什么就是什么嘛,词讯类型,另外还有这个time window当前的窗口类型定义好之后回顾回顾一下当前正常来讲应该是啊,应该是不报错的对吧?诶,那这里边我们看一下当前这个报错到底是报在了哪里呢?我们大家肯定就是下边这里边啊,你看需要的是string和time window类型对吧,而我们这里边给的是一个PV count window result,哎,那这里边这个类。
15:06
进行不匹配,那我们看一下这里边哪里的这个类型点错了啊,诶这里边我们这里边定义的呃,输出对,输出是这个count,这个没有问题,然后当前的K的类型是string对吧?啊这个window,呃,当前这个window的类型是time window,我们看一下这个影包引对了啊,就这。大家看到里边必须要实现的就是一个apply方法对吧?直接把这个方法实现就可以了啊,那关于我们上面这里报错的这个信息,这主要还是类型不匹配,如果大家要是仔细的观察一下的话,其实会发现我们当前的这个无function这个类型都是都是对的,我们很仔细的看了它当前的这个输入输出啊,还有这个当前的这个K的类型,对吧?我们当前KY这里边是传了一个函数,所以说这里边它本身是string类型吧,我们直接用它作为类型肯定是没有问题的,那这里边出在哪了呢?这个问题主要是出在前边。
16:07
PV抗AJ这里我们这里边用的这个输入参数是什么呢?是一个user behavior对吧?而且当前我们这里用的是这个就是定义好的这个样例类类型,但是大家注意我们这里边其实输入参数是什么,是不是已经把它map成一个二元组了呀?哎,所以这里边就是我们这里边本身在后边做这个操作的时候传进来的数据其实只是一个二元组的类型,根本就不是user behavior啊,所以这里边我们这个类型大家要小心一点啊,经翅有这样的问题,呃,这里边我们传的其实是一个string和一个int这样的一个类型,对吧?啊,如果大家想要把这个后面这个一也改成长整形的话,这个也可以,对吧?因为我们这里边只是定义这个accumulator作为后边的输出,其实前面你那个一还是几都无所谓,我们都是来一个数就加一,来一个数就加一,跟它没关系啊,那所以这里边如果改成二元组类型的话,所有下边这里边value的类型都得改,对吧?哎,这。
17:07
里边把这个改过来,呃,然后大家看到把这个输出之后。我们看一下还有哪里需要改啊。直接把这个重新重新重写一下吧,当前的这个二元组类型,然后这里面大家看到这个应该是0L,这里面还是UM1。这里面最后拿到的是一个accumul,这里是A加B对吧,然后我们来确认一下,如果上边诶,大家看到这里边在报错啊,我们这里面是。这是下面我们这个没有把这个注释给写上是吧,然后我们把这个类型定义好了之后,呃,大家现在再看看一下上面我们这里的问题还出在哪里啊。大家看一下这里报错的这个信息,大家看到这里边expect是什么呢?它是string int,还真的不能定义成长整形对吧?就是刚才我们定义成了长整形,为什么呢?因为这里边你输入的时候,这里边没有指定它是长整形对吧?所以它默认这里边需要的,哎,这这里给的是一个int类型,你如果要是把这里边给成长整形的话,这里边的类型就全匹配了,这就没有问题了啊,所以这个类型转换大家还是要稍微的注意一点啊,有时候写的这个步骤太多太杂了之后,后边这个类型是很容易出问题的,好,然后接下来我们就在这个apply里边去做一个操作,哎,那这里边现在我们主要是要拿到当前的window信息,对吧?那所有的信息其实都已经在这儿了,我用一个out.CFT做一个包装PV count包装成一个样例类就可以了,那当前呢,我要的这个window and,那就是window.get end,然后还有一个count值,那我直接用input,诶,我可以用那个迭代器。
18:58
B,然后next拿出来或者可以怎么样呢?那大家知道这本来就是一个可迭代类型吧,可迭代类型里面本来就有一个head对吧,就是头第一个值,我直接点head把它拿出来也是OK的啊,所以这个其实也有很多这个可替换的这种写法啊,直接这么做就把这当前这个需求就搞定了啊,所以这个其实整体实现起来还是很简单的,我们先来运行一下,看看效果怎么样。
19:27
好运行一下,好运行结束,我们看一下当前的结结果啊,大家看现在就是每隔一小时,大家看这个时间初二啊零,最后的000,那是毫秒加的三个零啊,我们看1600,然后5200,这是不是隔了一个小时啊啊,尽管我们不知道这到底是几点,但我们知道肯定应该是整小时对吧?啊就是整,然后中间隔了3600秒,肯定都是整小时8800啊,就是这样一个小时一个小时输出的,因为前面我们定义了那个全局并行度是一嘛,所以这里边你直接看到的都是按照顺序输出的一个一个窗口,然后每一个窗口里边统计的这个数量是多少呢?那大家看基本上这个是四万多对吧?呃,这个巅峰峰值的话,到后面这几个,到了这个52000多啊,最多的一个52500多啊,那最后一个窗口,大家知道这个数据最后一个窗口时间上可能截截掉了,所以就只有13条数据啊,那整体来讲的话,我们当前的这个真实数据。
20:28
它的PV值啊,每个小时之内访问量大概是四万多5万的一个样子,对吧?啊,所以这个我们大概是有一个了解了。
我来说两句