00:00
接下来我们就具体在代码当中把这个项目做一个实现,首先我们还是先去新建一个项目。呃,这里边这个group ID com.at硅谷,呃,这个artifac ID,我们叫user behavior,用户行为分析analysis。啊,我们就直接在这个window里面打开就好。呃,大家首先会想到在这个项目里边,里边其实是会有很多个拈,对不对啊,所以我们整个的这个想法不会把它直接在这个user behavior,就是外层的这个项目里边,直接在这个S命下面去写,而是在下面应该先干什么,对,先先建这个子拈对不对,所以相当于我们这里是建了一个这个父子项目,所以接下来就是你有一个module,对吧。
01:01
嗯。这里我们第一个模块应该是做实时的热门商品统计,所以这个我叫做hot items。Item。是。好,把这个先创建出来。呃,大家看这个一创建出来之后,这里边这个副项目的po文件里边就多了这么几行,对吧,大家看已经添加了一个模块,然后另外它的这个packaging变成了po,所以大家知道这个副项目是不是这个S目录就可以删掉了,没什么用对吧?所以我们把它直接删了就好了。然后接下来我们真正的代码是要放到呃,就是当前这个模块实时热门统计的代码要放到source main下面,啊,大家又看到了,这里边其实是呃,这个对是Java对吧,所以我直接看着不舒服,改一个名吧。
02:07
好,然后接下来我们是不是先得做一些这个一开始的一些准备,对吧?首先大家看到这个泡文件我们还没详细的去写呢,这泡文件应该要引入一些依赖,我们这个项目需要引入什么依赖呢?首先我们这个项目是一个对flink写的项目,所以肯定要引入flink相关的依赖,对不对?呃,另外还还得引入什么呢。哦,常规来讲的话,大家可能别的好像想不到什么比特别特殊的了,那直观来讲的话,一般在应用场景下面,大家会想是不是卡夫卡应该是必要的啊,就一般情况,我们这个消费数据读取数据,我们这里边可能是等一下直接从文件里面去读取了,真实的应用场景,卡夫卡一般是少不了的,对吧?啊,另外就是我们的输出也可能是需要去think到卡夫卡里面去的啊,所以基本上这两部分还是一般我们要把它直接放在副模的POM文文件里边,放在外面的含义是什么呢?
03:18
对,是不是所有的子拈不用自己再定义再声明了,对不对,哎,直接默认他们就全部会引入这些这些依赖,好,那现在我就把这个直接从文档里边给大家做一个copy,大家看一下文档里边首先写的是什么,不是depend dependency啊,当然这是这个一般大家做项目管理的时候的一个一个规范了,一般在po文件里边是不是先要定义对版本信息,那你如果不做版本控制的话,那有可能,呃,大家有可能很多人都在改这个po文件,对吧?呃,你这个人引入这这个依赖,呃,其他人在别的模块可能也引入了类似的依赖,但是呢,版本不一样,那到时候这个冲突就很麻烦了,所以这里边我们肯定是要在副项目的po文件里边,把全局的这个版本全部定义好,这里边我们的这个flink版本用的是1.7.2,然后下面还定义了scla的b re version,对吧,Sla版本2.1。
04:19
幺幺,然后还有一个卡夫卡的版本是2.2,所以我先把这个properties copy过来。好,接下来就应该是dependency了,对吧?啊,那大家知道在这个里边flink既然用到了flink,那是不是flink scalela和flink stream sc这两个肯定都要引入啊,哎,大家应该见过这个这个方法对吧,这样的一个表示。Dollar里边大括号。然后这个s skyla byary version,这是不是就把我们前面定义好的版本直接放在这里了啊,当然对应下边这个flink的版本也是直接用这种方式引入,而不要直接写死,所以我们是不是要改的话,上面一改下面就全改了,呃,版本升级的话直接一下就全改了,所以这里是flink skyla和flink streaming skyla,然后下面是不是还有卡夫卡啊啊,所以大家看这里边我要引入,首先是本身卡夫卡的客户端要有,对吧,卡夫卡要有,另外还有一个是flink卡夫卡link跟卡夫卡的连接器connect。
05:25
啊,那当然了,这里边对应的那个tla版本也得跟在后边啊,下边大家看这里是用的flink版本对不对?连接器的话是得指定flink的版本,而这里边是卡夫卡客户端的版本,好,这是我们要引入所有的这个dependency,先把它copy进来。另外下面我们还得还得引入什么呀。啊,下面是不是还得有这个插件啊,对吧?做build环节的一些插件啊,这个我们就跟之前在做这个理论学习的过程当中引入的一样,还是引入一个打包插件skyla plug,对吧?呃呃,不是,这个是我们的那个编译插件啊,就是要把这个skyla代码编译成class文件skyla plug in,另外还有一个打包插件ma assembly plug in,对吧?啊,这部分跟之前的内容是一样的,所以我们就直接copy就好了。
06:23
啊,所以这部分大家看,其实直接抄也是没什么问题的,但是你得清楚,把这个都理的非常的顺的话,那直接copy过来一点问题没有,好,现在这个把它引入之后,我们刷新一下,在这里会看到user behavior下边dependency引入了,对吧?另外这个插件这里边对应的这个大家看ma assembly plug in也已经引入了,这里边这个ma sky ma plug in也引入了那对应的子模块。是不是也有了,呃,对吧,直接在这个副拈里面定义好的依赖和插件子拈直接就可以用了,它都会去引入啊,那大家想一想,这个子模块的po文件里边还要制定一些什么东西吗?
07:10
呃,这个这个子模块我们做实时的热门商品统计,好像不需要什么了,对吧?呃,好像没有用到什么特殊的东西,所以这里边我们就不需要做任何操作了,当然这里面大家能看到这个变化,它定义的时候是不是上面有一个parent标签啊?呃,定义了它的负项目是什么?好,所以接下来我们就真正的可以在里边去写代码了,Skyla下面去新建一个object对吧?呃,这里边,诶在这个新在写这个代码之前,大家会发现还有一个准备工作我们还没做呢,什么东西还需要做啊。我们现在是不是还没数据呢?对数据一般我们会放在resources下边,好,那这里我是把这个数据都放在了这个data下边啊,我们这里面要用的是这个user behavior c,呃,点CSV对吧,所以我们在这里把它直接复制到这个目录下边。
08:13
好,我们看一眼这个数据吧,啊,这个数据就是量其实还还可以,但是其实也还没有达到我们真正大数据处理的这个量级,大概这也就是几十万条,40多万条,不到50万条对吧?呃,所以这个其实还好啦,呃,我们还是给大家就是把这一个整个的过程讲清楚就可以,大家看一下这个字段表示什么含义呢?就是我们前面给大家讲到的user ID,然后对哪个商品item ID对吧?呃,另外这里还有一个类别类目的,对category ID,最后还有这个到底是什么行为,PV,还有一个时间戳对吧?这里边大家看是不是大部分都是PV啊,因为大家知道肯定是那个点击浏览,那这个行为最多嘛,呃,然后大家看哦,还看到还有BY对吧,对,这个购买这个还有还有cart cart就相当于是,呃,购物车操作了啊,就相当于这个啊,另外还有什么,还有费费这个相当于是喜欢对吧?对。
09:13
呃,相当于是打了一个like这个标签,所以这里边它其实就这么几种简单的行为,做了一个买点日志的收集,那大家可以看看到,就是我们可能基于它确实是能够分析的指标确实也会比较少啊,但是大家可以去自己定义一些数据,去把自己想要分析的数据都提取出来啊,好,这是这部分,然后我们接下来就真正的可以去创建scla的object,然后去写代码了。com加上包名at硅谷点,这个叫hot items。呃,我们加上这个analysis吧,类名我们就叫hot,然后。
10:06
啊,大家会想到在这个main函数里边,我们接下来其实就是flink流式处理程序的那个标标准流程,一个一个往下走就可以了,对不对,对吧,要有这个执行环境,要有这个,呃,Source要有transform,要有think,其实就是这个流程,呃,在这个之前,我们先定义一个样例类吧,对吧?呃,做做这个数据分析,我们养成这个好习惯,先定义。输入数据的样例类,那这里边我们这个case class。就叫做user behavior好了。啊,大家还记得里面的字段user ID啊,这个我们给一个long类型,然后是item ID也是long对吧?啊最然后是有一个类别ID category ID,这个我们给个in好了,其实这个我们如果不分类的热门统计的话,或者不做分类的这个指标查询的话,这个好像没啥用,对吧?呃,在这个代码里边可能没啥用,然后后边是一个behavior,这个behavior是不是,它应该是什么类型。
11:25
对,它当然就是一个string对吧?啊,就是by cart啊,或者说这个PV这样的一个string,最后还有一个时间戳time STEM,好,那这个是不是一个long类型啊,好,先把它定义好,另外大家一定还记得这个我们在讲那个PPT的时候给大家讲过,后边我们输出的过程当中,其实不是最后的输出,是中间对做那一个统计结果窗口聚合结果的时候,是不是要输出一种数据类型啊啊,就是包含了它的那个window的window信息,Window and,另外还有一个对当前这个呃,Item这个商品的个数对吧,统计出来的个数,所以我们再定义一个窗口聚合结果量力类。
12:25
呃,这个我们叫做item will count。好,那当然这里边首先是不是要有item ID啊,这个是一个long类型,然后后边需要有一个window and window and应该是什么类型,Window and本质还应该是一个时间戳,对时间戳,所以是long对吧?呃,最后还有一个count,对这count我们不知道多少,就直接给一个浪好了,好,这是我们一开始先定义好的样例类,然后接下来我们就可以按部就班写这个流程了,对吧?第一步创建执行环境DNA。
13:18
Execution environment,然后get environment啊,这个大家养成习惯的话,知道它上面一般情况我们后面要做那个转换的时候,需要把那个引入对吧?啊,影视转换的那个条件引入,然后接下来啊,当然这里面这个如果要是防止我们输出的时候出现这个混乱的情况,我们可以直接把全局并毒先设成一不影响结果正确对吧?好,接下来就是S。呃,毒毒诶。嗯,什么情况。
14:03
读取数据。呃,这里我们可以定义一个,比方说我就叫data stream啊,因为现在我们从哪里去读取,对,是从这个文件里面读取,所以是read text file这里边这个路径,用这个user behavior的路径就可以了。这里大家写自己的路径啊,好,然后接下来做什么操作呢?读进来之后是不是得先把它转换成我们想要的那个user behavior那类型啊,对,所以这里边啊做一个map操作,当然这里边每一个数据读进来是不是先要做一个切分啊,切成一个data RA,它应该等于data split用什么呢?对逗号分割。
15:01
然后接下来包装成user behavior,是不是data a read0对,就是啊,当然我们一般养成习惯就是说去掉空格对吧,先tri一下,然后对,是不是要出long,本身是一个long类型的user ID,同样data ra1 trim之后to long,这个是item ID,然后data ra2。Trim之后这个是category ID啊,我们好像是int是吧,所以to int,然后三这个是PV啊,不是PV就是behavior对吧?那是不是直接tri就可以了,本来就是string。最后一个data array4,我们的时间戳train之后要涂了,对,这样就把这个转换成了我们想要的这个,呃,数据类型user behavior,对吧?啊,然后接下来大家记得我们现在应该要用的这个时间语义应该是什么呢?对,其实现在我们要用的时间应该是even time对不对?因为这里边是有时间戳的,但是flink并不可能,你这里边指定了一个字段叫time STEM,他就以为你这里面有时间戳了,他并不知道,对吧?而且我们现在是不是还没有设置时间语义啊。
16:31
现在我们要设时间语义,为什么对even time set time character。这里边time characteristic,选下面这个啊,Even time对吧,先把它设置好,设置完了还没完,我现在是不是需要,需要怎么样,是不是得从数据里边提取某个字段,指定成这个是我们的时间戳对吧?啊,另外就是说还可以去指定这个water mark,诶那这里面我们要去看一下了,因为这个到底怎么指定water mark,我们说过是不是跟这个数据本身有关系啊,呃,大家看一下这个数据。
17:17
诶,这个数据的时间戳,诶大家看这好像都一样对吧,这个数据量还挺大。诶,接下来是001,终于变了002003,诶大家看它是不是完全是一个已经排好序的升序数据啊,哎,那这个简单了,我我们怎么写就可以直接可以as sign,对升序那个叫做asending time stamps,直接调这个就就OK对吧?那这里边是不是要传,呃,就是提取我们对应的那个字段就可以了,现在提取哪个字段time STEM对吧?直接把它提取出来就可以。那大家可能要问了,这个time STEM就可以这样就完了吗?之前我们好像总要乘1000的呀,这个乘不乘1000是不是还是说要看这里边这个数到底跟我们想要表达的真实时间是什么样的,对吧?
18:12
呃,这个一五几打头,大家大家其实知道啊,这个状态是什么样子啊。这个状态应该是一个秒的状态,对不对,所以这里我其实应该还是要乘以1000啊,1000L对吧,一个长整形啊呃,这是我们的基本的这样的一个。一个处理流程啊,当然后边应该有think对不对,呃,这里边think我们就是直接。直接在这个控制台输出好了,对吧,控制台输出。呃,那当然这里边我们可以先写一个data print啊,最后print不一定是他,注意不要忘记最后对要去执行。
19:02
这里边我们叫hot items drop,好,这就是我们的呃,主体的处理流程,然后接下来我们就得往里边填这个具体的操作过程了,对吧,大家还记得这个我们的具体操作过程是什么样子吗?过滤对,是不是先要做过滤啊,对吧?所以这里边。但是这里边就不应该是三啊,接下来做处理对不对。Transport处理数据,首先我们应该是要先把它做一个filter,为什么呢?因为我们当前统计热门的商品的话,其实我们想看的就是PV最多的那个,呃,比较多的就是热门的,对吧?那至于别的我们是不是没有涵盖在内啊啊,当然这个看大家自己的定义啊,如果说我们认为别的也代表它的热门度的话,也可以加进来,甚至还可以给不同的权重,对不对啊,大家可以想到啊,就是可以设置一些别的一些规则,这里边我们就简单处理,就只有PV才算这里边热门的指标,这里我可以定义一个。
20:18
呃,就是process。Stream,它就应该是data stream接下来要做处理了,对吧。先做一个filter,呃,这个filter的时候我们用什么做filter呢?是不是就是它的behavior需要是不是等于PV就可以了,哎,这就是我们要做的这个筛选啊,Behavior等于PV,然后下一步下一步直接开窗吗?我们当前的这个对现在还是一个data stream data stream不能要开窗的话,那就是WINDOW2了,对吧?那我们一般情况都是对先做一个KBY做一个分组,那KBY要根据哪个字段做KBY呢?啊,我们当然是item ID对不对?
21:13
根据item ID进行一个K,然后接下来做什么?啊,这边大家看我们可以基于这个kid stream,可以做各种各样的操作,可以这个Fla map,对吧,可以filter,可以process,我们这里边是想要先开一个窗口,对吧,基于k stream创建一个window的stream,这里边的窗口是一个time window,对,那当然我们要创建的是一个滑窗,长度是对,长度是一个小时,所以time这里边time的引入啊。Window time.time这里边我们引入HOURS1,然后接下来对它的步长移动滑动步长是五分钟对吧?好像我们需求定义的是五分钟,这个大家可以按照自己的要求去改啊,然后接下来是不是就可以做聚合了,对吧?诶大家还记得我们这个聚合是当时是用了一个什么算子去做聚合吗?
22:16
对,那不是直接sum对吧?呃,Sum的话,那你只能是sum某一个字段,我们是不光要做聚合,聚合完了之后还要输出成一个特殊的数据结构,对不对,还要加上window and的那个信息,所以这里边我们用的是aggregate这个sense,而这个aggregate里边我们是不是大家看这个aggregate具体实现啊。它有不同的这种实现方法,最简单是不是直接传一个aggregate function就可以了,最简单是这样啊,然后还可以传什么呢?可以传两个参数,一个是一个。就是预聚合的一个aggregate function对不对,然后还有一个是window function,我们要的是不是这种实现啊,就是预聚合先来一个处理一个聚合,一个状态出来,然后后边是不是用window方式再输出一个,最后我们想要的包装好的那个结果啊,啊,所以这是我们想要做的这个事情,接下来当然这里我们要自定义了啊,New一个,我们就把它叫做count a j啊,Count的一个聚合,然后new一个。
23:24
Window result。这是我们自定义的两个方式,一个是aggregate function,一个是window function。
我来说两句