00:00
接下来我们看一下在整个这个项目里边数据源是什么样的啊,我们这里边主要典型的数据源有两个来源,一个就是大家看这个买点日志对不对,用户行为数据买点日志我把它保存成了一个叫做user behavior.csv一个CSV文件啊,那这里边的这个数据结构主要是什么样子呢?大家看啊,这就是一条数据的一个例子。那这个主要就是,呃,大家其实能想到这肯定就是有一个user ID,然后有一个对,有一个item ID对吧,有一个商品ID,然后后边啊,当然后边这个是他的那个类别,分分类的那个类别ID啊,然后接下来还有对,这到底是一个什么行为,对不对啊,你既然是用户行为嘛,到底是什么,这里边呃,收集到的这个买点数据呢,就是只有这么几种行为,就是一个是PV啊,那大家知道PV就是page view就是。
01:02
浏览,那其实就应该是点击了一下对吧,类似于点击这样的操作,呃,然后还有就是说,呃,FA favorite,那就相当于是喜欢收藏这样的一个操作,对不对?呃,然后另外就还有就是这个buy,就是像这个购买这样的一些操作,就是这里边我们涉及到的买点数据可能相对来讲不是那么的多,所以后边有一些模块呢,我们会用一些其他的数据,当然就是说那些数据可能有不同的文件,我们也是写在CSV里边,正常来讲应该都从这一个买点日志里边去取,对吧?啊,大家就就姑且认为我们剩下的那些是已经做了ETL的,然后从从别的那个日志文件里面提取出来的文件啊。啊,那当然了,最后不肯定不会少的一个是对,还要有一个实验戳啊,所以大家看这个其实并不是我们一开始看到的那个日志里边写的东西,这应该是已经做过ETL了,对吧?啊,是已经做过一些预处理操作之后的东西。另外我们还可以用什么来做数据处理呢?我们还可以拿到另外一个日志,就是web服务器的日志啊,比方说这里边我们用一个阿帕奇的web服务器。
02:15
拿到他的日志是一个阿帕奇点log啊,那这里边他的这个日志长什么样呢?对,大家就会看到这里边有一个IP对吧?啊,后边的这两项没有啊,应该是这个user ID和username,这里边没有的话,我们不管它就好了,然后还有什么对大家看这是正常的那个一开始收集到日志的那个状态对不对啊,这里边的日志真的就是时间,而且后面还跟着。这个加0000是什么?对,时区对不对啊,这个是时区啊,这里边0000当然就不用去考虑了啊,所以大家会想到如果我们要用这个数据的话,还得怎么样啊,格式化对,还得格式化,把这个是不是转换成想要我们能操作的那个时间戳对不对,然后才能去指定医文的time,好,然后后边是get,这是不是访问方法啊,对吧,请求的方法后边是那个URL啊,所以如果我们要统计那个热门页面,热门这个热门网站的话,是不是可以直接根据后面URL去做一个统计啊啊,当然这里边就是可能涉及到各种各样的过滤条件啊,这个后面给大家做一个具体的实现。
03:27
好,呃,所以大家看,其实主要的内容我们都从买点日志里边来,对吧?啊,数据就是这样,接下来我们看一下这个大概数据结构是什么样吧,这个就比较简单了,我们肯定要定义样例类,定义这样的数据结构,比方说这个用户行为user behavior,我们定义出来就是啊,大家看啊,User ID item ID,呃,Category ID对吧,类别的ID,还有behavior类型,那这个就变成一个string了,对不对啊,别的大部分是long,这个是一个int啊,然后time step啊,这个是最关键的一个啊,这个behavior和time这两个比较关键。
04:04
那同样这里边这个web日志里边,我们可以包装成一个,比方说叫阿帕奇log event啊,那这里边包括ID user IP user ID,还有这个even time,还有这个呃,请求的方法对吧,Method,还有URL,所以这个就是我们到时候可能前两个项目里边要用到的这个基本的样例类啊好,然后接下来我们看一看具体的这些拈,首先我们给大家讲一讲这个实时的热门商品统计。那么大家首先想一想,我们现在的需求是什么样呢?简单来讲,其实这个热门商品统计,正常来讲的话,一般都是近期的一个热门商品统计,对吧?啊,一般不会去统计这个历史,当然大家如果想要去统计历史的话,更简单对不对?用流处理的话,历史数据这个更简单啊,呃,就是你不需要去开窗了,对吧?如果要是说我们现在是要统计近期的话,那自然大家就会想到这就类似于一个开窗的需求了,这里边我们的需求是统计近一个小时内的热门商品,每五分钟更新啊,当然这个五分钟大家看这个好像实时就有点不够实时,对不对?当然这个就是这只是一个事例啊,我们是按这个频率来更新,如果我们愿意把这个调的更小,实时性更强,随时刷新的话,当然可以把这个调的更小啊,我们这里边以这个为例子啊,然后这个热门度用什么来表示呢?你既然是要统计这个热门商品。
05:38
啊啊,热门度这里边我们就用浏览的那个次数,那就是我们买点日志里边的那个PV行为来表示,对不对啊,PV的那个行为越多的商品就代表他这个越热门,这是我们的一个需求,那怎么去解决这样一个问题呢?解决这样一个需求。那其实很简单啊,你既然是要统计近一个小时之内的,然后五分钟更新一次,那是不是统计多长时间内的,那就以这个时间去开一个窗对吧?那然后每五分钟更新一次,那就按这个时间对滑动一下不就完了吗?所以我们要的其实就是一个。
06:20
长长度是一个小时,滑动距离是五分钟的一个滑动时间窗口,对吧,然后到时候我们要用什么样的时间语义。对,肯定是even time对不对?用事件时间语,因为我们这里边,因为我们这里边都已经是保存在文件里边的日志了,那更得用事件时间对不对?呃,跟我们现在处理的时间完全不一样了,所以肯定是even type,另外就是我们首先从所有的用户行为里边要去过滤出PV行为,然后进行统计就可以了,对吧?啊,这是我们呃,简单的一个思路,当然了,按照这个统计出来的是不是还要做排序啊?
07:02
最后排序,然后top n输出对吧,这是我们要做的最后的输出结果,好,那整体我们先看一下一个简单的实现过程是什么样的呢?在flink里边主体的这个操作是不是都是data stream呀。在这个过程当中,一开始先拿到我们的呃,这个输入的数据,然后可能我们要做一些基本的map操作,包装成样例类,对吧,有点behavior,然后接下来干什么。是不是首先要做啊,当然就是一开始可能我们要做filter对吧?啊,这个过滤我们就不说了,前面已经提到,然后比较重要的是接下来要做。是不是要做K拜啊,做一个分区对不对,那大家想一想,这里边我们K是按照什么做K拜。我们统计统计热门的商品,每个时间对按照ID,谁的ID user ID还是商品ID,对按照商品的ID去做一个分组,对不对?哎,然后接下来是不是每一组里边可以统计出一个数据啊,对吧?在这个时间啊,所以接下来先分组,大家看本来每一个商品不同的颜色对吧?在一个流里边分组之后,各处理各的,那蓝色就是蓝色,黄色就是黄色。
08:26
那接下来是不是在他基础上就可以开时间窗口啊,啊,所以大家知道做了这个KBY操作之后,Data stream变成了一个k stream k stream基础上可以开时间窗口,对不对?呃,点window time window啊,直接就可以得到一个window的,所以大家看接下来这个数据就可以有这个一个一个的窗口,把数据划分到一个一个要统计的这个桶里边了,对吧?然后接下来是不是就可以聚合了啊,基于这个window stream去做一个聚合,聚合出来的结果是不是就又变成了一个data stream。
09:04
这在这里大家要注意一下啊,聚合出来这个结果跟我们之前的那个数据结构是一样的吗?不一样。对,可以不一样,对不对?哎,这里边我们是还想要之前的那个user behavior,还想要那样的一个数据结构吗?还是说我们这里边其实想要的数据结构了,对,显然这里我们输出的应该要有一些别的内容,我们想输出什么呢?显然是要对在这个时间窗口里面统计出来点击的那个次数,要把这个count数量要要输出对不对。另外这里边大家觉得还需要有什么?因为后面我们是不是还得排序呢,对还得排序,那有看的可以排序了,那是所有的这些一起排序吗。大家想一想,所有的输出的所有的数据一起排序吗?这里所有输出的数据,那可是所有的窗口聚合出来的结果啊,不同的窗口你要一起排序吗?显然不是,对吧,我们要排序的是不是就是一个时间窗口里边所有的数据做一个排序啊,哎,所以大家注意,这里边的数据相当于我们是不是还得加进什么信息,对窗口的信息,所以大家看这里边。
10:30
这里边这一个,呃,这个画的这个图也是本来的这个数据里边,是不是外面还把这个窗口的信息也带进来了,对吧,加了一个红框啊,这边就是加了一个绿框对吧?啊,这边加了一个紫框,所以大家会想到接下来我们是不是要针对这些同样窗口的这个数据要做排序,这个才有意义啊啊所以后续还要做其他的一些操作啊,大家可以先思考一下后续我们怎么做。好,我们现在就先把前面的这一部分先实现一下,因为到这一部为止,其实这是我们比较熟悉的,就是一个开窗,然后做聚合的过程,对不对?好,然后每一步啊,首先第一步啊,前面的那个filter我们没说了啊,已经过了,然后接下来是不是KBY啊,KBY是不是要针对item ID做一个分组,做一个KBY,所以接下来大家看我这里边就主要的信息,要的是这个时间戳啊,就是10.01 10.03对吧,把这些数据大家看到,根据本来在数据流里边一视同仁,都是同样的数据,KY之后各处理各对吧?啊就已经分开了。
11:40
然后接下来是不是开时间窗口啊,哎,这里设置的时间窗口是不是time window,然后两个参数,一个是大小是一个小时60分,滑动距离是五分钟,对啊,所以现在就按照这样的一个时间窗口,把不同的数据分发到了不同的窗口里边去,这里大家需要注意的是时间窗口的特点是。
12:05
左臂右开,前臂右后后开对吧?啊,这个大家一定要确,就是确定一下,那么这里边假如我有一个数据是10:10的数据。五分钟一个窗口,它应该分发到哪个窗口里面去呢?哎,大家大家就会发现,首先既然是这个前避后开嘛,左避后开嘛,十点十分到11:10一个小时的这个窗口,是不是它应该属于啊,包括这个数据对吧?哎,那它还属于别的窗口吗?对,前边左移啊,那相当于滑动五五分钟对不对?10.05~11.05是不是也包括这个数据啊,十点到11点是不是也包括这个数据啊。啊,那大家想如果再往前滑的话,还有没有,其实还有对不对,19:55~10:55是不是也包括它,哎,所以大家会想到是不是同一份数据会被分发到不同的窗口里面去啊,可以是多个窗口,大家想一下应该是同时分发到多少个窗口里面去啊对,大家已经想到了,前面我们在做这个,呃,之前那个窗口,呃,Start点怎么样去确定的时候,讲解的过程当中,大家已经发现它的窗口个数是不是就是整个的窗口大小除以对滑动距离那60分钟除以五分钟,当然是12个窗口,对吧?啊,大家如果愿意较真的话,可以把那个窗口一个一个写出来啊,看看是不是12个啊好。
13:43
然后接下来是不是就可以做聚合了,哎,这里这个聚合呢,我们用了一个点aggregate aggregate显然是更加普遍的一种聚合函数,对不对?哎,这里大家看一下这个聚合稍微有点奇怪,它里边传了两个参数啊,当然这里边大家一看这个形式,这这又变成我自定义的一个一个这个函数类了,对不对,那这自定义的两个函数类到底是什么东西呢?
14:12
前边这个叫。聚合函数对,就是定义了一个窗口的聚合规则,后边的这个呢。这个是一个window function,就是我们的所谓的那个窗口函数定义输出结果对不对?呃,到底最后窗窗口关闭的时候输出什么结果,所以大家看为什么我们要分两个呢。为什么要分两个?我我能不能直接在后面直接给一个那个window function直接处理完了得了,哎,其实也是可以的,当然了,Aggregate本身这个它是要做聚合的,对不对啊,它必须后面得实现那个聚合函数啊,那大家会想到我直接一个比方说一个点啊,之前没给大家详细讲有一个点apply方法对吧?Apply方法的话,后边就是直接传一个window function。
15:06
或者我直接放大招,直接点process可以不可以?也是可以的,对吧,直接后边是不是就处理它就行了,诶那这里为什么要先定义一个聚合规则呢。在这里它其实略有不同啊,它的操作略有不同,当时给大家讲到就是后边这个窗口的行为,窗口的这个聚合的时候,其实是讲过有两种窗口函数的,对不对,一种叫。一种是增量聚合函数,另外一种叫对叫呃,全窗口函数,对不对,或者大家理解就叫全量函数对吧?啊,都可以,就是full full window function,那它们俩的区别在于增量的是来一条就聚合一条,来一条就聚合一条,只不过是不输出对吧?是不是相当于前面的数据就都不保存了,就直接把是不是只存一个状态就可以了呀,对吧,只把状状态留存下来就可以了,而全窗口函数的意义是所有的数据都存下来,最后一起处理对不对啊,所以这里边我们为什么要用aggregate加一个这个聚合函数呢?就是说是不是用这个聚合函数就可以来一条就聚合一下,来一条就聚合一下对不对啊,然后接下来我们到时候最后直接输出一个状态就可以了,那大家注意啊,这里边我们去聚合的时候。
16:37
这个状态是想要统计什么呢?商品是不是就是商品个数啊,其实就是那个count值,就就是一个计数器对吧?呃,就是一个计数,那大家会想到最后我们那个window function式,最后想要输出的数据结构就是一个数吗?不是,我们是不是还得甚至我还有window相关的信息也要塞进去,对不对?哎,我要包很多信息塞进去,所以我再单独定义一个window function,是不是要定义一个想要的输出数据结构啊,啊比方说这里面我又包装成了一个样例类,这个叫item view count对吧?啊,就是每一个商品我看到的它的那个在这个窗口内的统计出来的数量数值,那这这个亮例类包含什么字段呢?
17:26
首先哪个商品啊,商品ID对吧?另外还有对窗口的信息,窗口的信息用什么表示呢?啊,其实这个就是只要我们确定窗口的起始或者是窗口的结束,是不是都可以确定这个窗口啊,这边我们只要一个就可以了,我这里面用了一个窗口window and对吧?啊,用了一个它的结束的这个时间点,然后另外还要有一个有一个商品的数量对吧,那就是被被浏览被点击的这个数量count,那这个count值是不是就应该是。
18:00
前边聚合好的那个状态啊,哎,对,这样的话,它定义的这两个函数,它们的功能就非常的明确了,前边就是相当于是做了一个预聚合,对不对啊,预先做一个聚合,然后后边是不是把最后聚合出来的结果包装在我们想要的那个输出结果里边,一下子输出啊,这是这样一个过程,所以最后得到的结果应该是个什么呢?之前是我们那个window stream对吧,Window stream最后得到的就是一个对data stream,它的类型是item will count啊跟前面我们的数据类型就不一样了,对吧?呃,好,然后接下来给大家先简单的大家看一眼这两个,这两个函数类到底长什么样啊,我想先把这个代码先贴出来。哎呀,这个看起来有点复杂对不对啊,其实这就是一个,就是到时候这些方法大家会想到这是不是都都是override对吧?都相当于是我们必须要实现的这个接口对吧?里边的方法,所以大家看这主要是干什么呢?其实是不是就是有一个计数器啊ACC accumulator就是一个累加器计数器对不对,那这个东西是不是就是来一条数据就加一,来一条数据就加一啊。
19:17
做一个count对吧?啊,其实就干这么一条事儿,对,就是一个累加器,所以这里边我们就定义了这样的一个累加规则啊,就是来一条数据就加一就好了,那么它实现的是一个什么呢?呃,它是要去实现一个Java的interface,一个叫做aggregate function这样一个函数类的接口,对吧?那所以它extends aggregate方式,呃,把这个要写进来。然后接下来大家再看一看后面那个window function又是什么样子呢?啊,大家看这个window function这个又有点不一样,Window function它是它是继承了一个scla的tra,这个代码是又变成了这个scla代码了啊,但是不管怎么样吧,我相当于还是要extend这样一个,呃,这个window function对不对?实现这个window function这个接口,大家可以认为这个treat就是接口嘛,对吧?那么这里边要去实现一个什么方法呢?要去override一个apply方法。
20:17
这个apply里边就定义了我们要输出什么数据类型,大家看我们输出的时候用了什么去输出。是不是collector.collect呀,这里的这个collector就是这里其实是把这个改了个名字叫collector了,其实本来是不是应该就是那个out呀,大家还记得熟悉的这个位置对不对?最后一个参数out,然后后边是collect是一个输出的类型,对不对啊,这其实就是那个out,所以我们直接out.CFT把要输出的数据包装成item will count输出就可以了,这就是我们要做的事情。好,这是我们前面做的这个窗口聚合,窗口聚合完还没完。
21:02
大家会想到接下来我们得到的这个数据是一个什么样子呢?哎,其实是这样的一个数据结构,对不对啊,当然这里边我没有用window and直接为了直观我把这个window写出来了,对吧?大家看前面的这个数据做了聚合之后,可能会统计出来在十点到11点这个区间内。一号商品是不是count是四啊,然后二号商品是count是三对不对,所以这其实是输出的两条数据对不对。大家知道我做KBY之后,是不是相当于每一个K都会输出一个,针对这个窗口输出一条数据啊,然后下一个窗口10.05~11.05,它是三,它是二,是不是也会输出两条数据啊,诶,所以大家会发现到这个时候,其实所有的数据它是不是已经不分ID,也不分窗口,都是同样的这样的一个数据结构啊。那现在我们怎么去做排序呢?对,大家会想到是不是接下来还得分组啊,因为我真正要排序的并不是所有的全部去排序,而是对相同的窗口去排序才有意义,对不对?所以接下来我想要做的其实是。
22:23
按照window and截取出来属于同样window and的数据,然后再按照count大小去排序,最后把这个item ID显示出来,这就是我们热门的商品,对不对?哎,所以大家想到接下来我们要做的是这个操作。哎,所以我们再做一次K,这次k by k by window and,现在我们得到的是不是就变成了,哎,所有的窗口里边得到的那个聚合数据是一是一组对吧?诶,另外一个窗口的数据又是一组,按这个做划分了。
23:01
然后在这一组数据内部是不是再去做排序输出啊,啊,这接下来可能就不是聚合了,是一个排序了啊,那这个排序又怎么做呢?如果我们想不到直接的这种调用的方法的话,我们直接上大招,什么大招对process function process function,大家会想到我是不是能把这一组里边的所有数据来一条就存一条,所有的全存起来,然后是不是直接把它存到一个数组里边做排序就可以了,排完序之后输出是不是就完事了?诶,这就是我们的整体思路,好,所以这是不是就涉及到状态编程了。我们接下来定义一个process function,把得到的每一组里边的数据是不是都存到一个状态里边啊,那现在我们要的状态是什么状态呢?之前我们讲状态编程的时候用的都是value state,那现在我们能直接存一个value吗?随时更新吗?不行,因为我们要存一组数据,对吧?当前窗口内的数据都得存下来,所以我是不是保存成一个列表啊,啊,所以这里是一个list state,好。
24:08
啊,所以最终我们用一个K的process function把它来搞定啊,这里边我们就是用这个window and作为K,然后哎,是不是把这个分组的这这一组数据里边全保存到list state,我们在process方式里边就可以读出来做排序对吧?最后输出,这就是我们最后要做的事情啊,所以大家可以看到,当然process方式大家回忆一下它是不是有生命周期啊,对吧?有open,这里边我们可以去获取一些初始的状态,后边处理每个元素的时候,是不是要调用process element问题啊啊,最后我们是不是还可以定义定时器啊,因为大家会想到我们在收集它的那个数据的时候,是不是这个有可能有延迟啊,啊,大家想对吧,有可能有延迟,所以对于这个东西,我正常来讲应该还要给一个延迟啊,定时触发,那所以这里的延迟怎么做呢。
25:06
大家会想到之前我不是已经有water mark了吗?当然water是一直传递的,对吧?Water mark是一直朝下传递的,但是大家会想到前边这里边。我们拿到的这个数据,这里边是不是已经没有时间戳了呀,这里已经没有时间戳了,对吧?那如果说这里边我们聚合到的数据有乱序的话,是不是也有可能会影响到我们后面的一个输出结果啊。哎,所以我们可能要去给一个延迟,稍微延迟一点,等这个数据全部收集到了,然后再去触发这个操作,所以这里边可能还要给一个定时器,对不对啊,定义一个timer好啊,最后整个的流程其实就是这样。大家可以看到啊,那就是open的时候创建一个这个状态对不对,然后我们每来一个元素添加到这个list state里边,最后是不是我们定义一个,比方说定义一个这个100毫秒的延迟,对吧?所以这里边就加100,然后触发时间就是等到100毫秒之后去出发,出发的时候干什么呢?是不是这里边的这个数据就都已都都已经来齐了呀,我就直接把这个list state里边的数都拿出来排序输出是不是就完事了。
26:24
这就是我们的一个整体的处理流程。
我来说两句