00:00
然后接下来我们就可以真正的开始敲代码了啊,那在这个呃,具体敲main方法里边的具体逻辑之前,大家会想到,诶,我们不是要用到有对应的那个样例类的那个包装吗?首先我们输入的数据就是一个样例类对吧?大家还记得这里边我们定义定义输入样例类,输入数据样例类我们把它叫做呃,Case class啊,我们直接就把它叫做user behavior,这里边都已经是逗号分割的就是五个字段,大家看第一个我们当时说是user ID item ID对吧?呃,这个用户ID,商品ID后面还有一个商品的类别ID后面呢,就是行为对吧,你到底做了一个浏览还是购买,还是这个加入购物车,呃,类似的这样的一些行为,最后是一个时间戳,所以接下来我们直接在里边把它定义好,首先是user ID长整型。
01:00
哎,然后后边是一个item ID也是长整形对吧?哎,再接下来是个类别ID,这个叫categoryy ID啊,这个我们就跟PPT上分析的一样啊,把它定义成一个int类型,或者I定义成长整型都可以啊,然后接下来还有这个操作的类型,对吧?Behavior一个string类型,最后还有一个时间戳。Time step长整型啊,所以这就是我们输入数据最后要包装成这个样子啊,然后接下来大家还记得我们中间有一个那个经过聚合之后转换得到的那个包含了,呃,每一个商品ID,包含了它在当前窗口内统计出来的count的数量,另外还有一个当前窗口的呃,那个结束时间对吧?Window end包含这三个字段的一个样类类型,我们管它叫做item view count对吧?哎,我们把这个也定义出来,当然你如果不专门定义,就一个三元组也行是吧,我们这里边定义出来的话,就是看的会更更明显一点啊,就是可代码的可读性更好一点,所以这里边定义定义窗口聚合结果样例类case class,呃,这里边写一个item will count里边的字段啊,那我们也都已经知道了啊。
02:32
Item ID,这是当前的核心字段对吧?呃,这是它的K啊长整型,然后另外我们要一个window的信息,Window end,那window end我们用什么表示呢?哎,就用时间戳表示完了呗,对吧?大家看这个时间戳本来就是长整形,那我这里边也用一个长整形,那另外最后还有一个当前的count值,哎,Count值大家如果要是担心这个数量很大的话,那我也用一个长整性,对吧?但是一般情况int也也足够了,对吧?大家知道这个,呃,这个正负20亿啊,这个基本上也是差不多够了啊,那如果说你想要给一个长整形没问题,这样的话,我们里边的三个字段就全是长整形的,好,那接下来那可能还需要有一个输出的样一类,对吧?那当前我们怎么样去输出呢?呃,大家知道输出的其实是一个类似于每一个窗口有一个排行榜对吧?呃,然后就是里边,呃,这个排名这个count第一的,呃这个商品是哪一个商品,然后呃,排名这个第二的是是哪一个,那大家可以想到。
03:32
啊,我直接输出这个item welcome就可以对吧?诶,我直接把这个输出就可以,另外如果说我还想让给大家有一个更加好的直观的一个可视化的视觉效果的话,那我直接把它包装成一个string,然后再控制台打印输出啊,就直接我写出汉字来对吧?当前的这个窗口时间是多少,然后呢?呃,里边我包装好排名第一的是哪一个,然后它抗值是多少,排名第二的商品是哪一个,然后看的值是多少啊这样的话,这个大家看就好像是我们有另外一个程序已经监控到这里边的这个输出结果,输出了一个可视化的结果一样,对吧?啊,给大家看看这个效果啊,当然这个可视化显得会比较简单好,接下来我们还是整体的流程再过一遍吧,首先要创建一个流处理的执行环境,Execution environment,直接get execution environment啊,大家记得要把这个下划线影视转换引入,对吧?
04:32
呃,然后接下来我们来设置对应的一些东西,那首先这个,呃,大家看到,首先这个并行度啊,我这里边如果说要是不影响后边我们这个输出结果的话,因为我们在控制台打印输出嘛,希望按照顺序来来输出正常的这个结果,特别是我们读取这个文件的时候啊,如果是真实场景下,大家知道你前后差五分钟的那个窗口,那肯定不可能出现乱序,但是如果说我们这里面读数据的话,这就没准了,对吧?你这个文件里面读数据这个很快,说不准五分钟的窗口会出现乱序,所以为了显示的更合理一点,我这里边把这个并行度,全局并行度设成一,不影响结果正确性,对吧?啊,大家知道含义就可以。然后另外还有一个重要要设置的就是时时间语义,当前的这个时间特性应该是设置什么呢?诶大家看到既然这里我们这个用户行为是有时间戳的嘛,然后我要统计当前的这个热门商品,那当然不是说我在什么时候做计算,那统计这个时间段内。
05:32
的热门,我要统计的是用户点击那个行为发生时候那个时间段的热门对吧?所以当然应该用事件时间啊,所以这里边我们定义这个time characteristic,然后选取这个事件时间对吧?哎,我们定义事件时间语义好先放在这里啊,前面的环境相关的操作先放在这里,然后接下来哎,那就要从文件中读取数据并啊大家想是不是要转换成样例类对吧?并转换成样例类好,那所以接下来诶,我们就直接定义一个data stream啊,我先定义一个这个input stream吧,就先从这个文件里边去做一个读取对吧?Env,直接read text file,因为当前是这个文本文件嘛,我直接。
06:32
Copy一下,当然大家也可以把这个呃,当前的这个目录定义成一个字符串单独传入对吧?我直接copy pass,直接把它粘过来,然后接下来定义一个data stream,那就是基于input stream去做一个转换,那大家想是不是直接做一个map就可以了啊里边我这里边的每一行数据,每一行这个data对吧,来了之后呢,都要去做一个,哎,都要去做一个切分处理,按照逗号分割,然后拆开之后每一个字段提取出来,包装成样一类,对吧?哎,那那这里边我就定义一个AR ray就等于data.split跟我们之前那那个做呃workcom的时候按照空格分割是类似的,我们现在是按照逗号分割对吧?呃,然后下面这个user behavior包装起来一个样例类。
07:32
那我要第一个字段,呃,其实就是五个字段对应填入对吧?第一个字段是长整型的user ID,那当然是要直接to long对吧?呃,因为这里边这个CSV它也没有空格什么的,我们也不用做tri之类的操作了啊呃,直接做这样一个转换就完了,然后后边ARA1还是长整型对吧?Item ID,第三个字段ARA2啊,那这个是一个int,我们定义的是一个category ID类类别,ID是一个int类型啊,所以把它定义出来,然后第四个字段,哎,那不用拖了,它本身就是string对吧?我们当时不是定义的是这个behavior吗?本身就是string,最后还有一个长整型的时间戳除long长整形对吧?
08:23
啊,瑞四涂了,哎,这就是我们完整的一个包装转换成样粒类的一个过程,对吧?然后另外还需要有一个什么呢?是不是既然是事件时间,我是不是还应该要指定当前的时间戳和watermark呀,对吧?并提取提取时间戳生成watermark,哎,就这几步,大家就就养成一个常规习惯就就可以了,对吧?一上来之后这个配置环境,环境里面做一些设置,然后读取数据源,然后map成样例类,然后配置当前的这个时间戳和watermark,然后这里边有两种方法,一种大家还记得一般化的assign,呃,Time sams and watermarks,我们说如果要是乱序数据的话,里边传一个邦ED的out aboutness time Sam extra,对吧,那个一长串那个那个那个类啊,然后。
09:23
如果要是升序数据已经要是就是已经排好序的话,是不是直接用升序的这个A3ING time sims就可以啊对吧,不用专门去在外面指定那个automark延迟啊,那这里边我们到底用哪个呢?用哪个看数据嘛,对吧?我们看一下数据到底是什么样子,这里边的数据我们看啊,15116580001658000,这都是一样,大家看都是同一个时间的,这个时间戳数据对吧,都是这样,哎,然后后面大家看到了001对吧?啊,这是下一秒的数据,002003004,诶,但直观这么一看的话,发现他做ETL之后的这个数据其实都已经是排好序的数据了,对吧?哎,那所以这里边我们的这个操作就简单很多了啊,这个都不需要这个处理断序数据啊,直接assign,这个assending time sams直接放在这儿对吧,那里边我得至少得指定到底那个时间戳字段是哪个呀?哎,我用一个提取器下划线点time Sam提取出来。
10:23
而且我知道这里边本身这个应该是一个秒数对吧?十位是一个秒数,那我这里毫秒数的话要乘以1000,这就是前面做的这个操作对吧?先拿到当前的这个状态啊,然后我把后边这个先写出来啊,最后大家知道肯定要执行起来对吧?Env fqte执行起来hot items,然后接下来的下一步操作做什么呢?哎,就是我们说的啊,按照流程来,大家还记得之前我们讲到的这个处理的流程吗?一开始在这里啊,一开始应该是先,呃,先先根据那个PV行为先做一个filter对吧?应该是先做一个过滤,然后后面就是哎,KBY做分组开窗,哎,开这个滑动窗口,然后做聚合,而且当时我们说聚合的时候要去给一个aggregate里边传两个参数,对吧?呃,就是一个预聚合的增量聚合函数。然后。
11:23
还有一个是全窗口函数啊,接下来我们就一步一步做这个操作啊,就按照这个流程来做一个实现,呃,那我们这是要得到得到窗口聚合结果啊,那首先这里边我们定义一个a j stream对吧,因为得到的还是一个data stream嘛啊然后大家可能如果想要知道这里边本身的这个数据类型的话,应该我们知道这里边读取出来之后,本身它是data stream stream对吧?然后经过转换之后,这里是一个样例类类型data stream。
12:02
User behavior,然后下边这里边得到的是不是就应该是一个data stream item will count呀,对吧?哎,所以大家看这个转换的过程啊,都是每一步我们都是一个样一类类型啊,这样的话你看起来就非常的直观,知道到底做了哪些操作,哎,所以我接下来基于这个data stream,首先先做一个future对吧?哎,我要的是当前的行为behavior,必须必须得是PV,哎,把它先定义出来啊,过滤过滤PV行为,然后接下来啊,那当然就是K败了啊对吧?按照当前的这个item ID,大家注意啊,按照商品ID去做一个分组,这里边按照商品ID分组,然后接下来开窗对吧?哎,我们要的是一个。
13:03
Time window嘛,Time window里边既然是一个滑动窗口,那当然就是说,呃,就是前面是那个滑动窗口的那个长度对吧?Time我们这里边引入的是window in time.time我们要的是统计一小时之内的,所以给一个HOURSS1,另外呢,每五分钟统计一次,当然就是MINUTES5,诶这里边设置滑动窗口进行进行统计,诶最后这一步最关键,接下来就是要做最后的聚合操作了,我们定义的是一个aggregate对吧?啊就是要做一个增量聚合,那大家看到啊,这里边再给大家解释一下aggregate,当时其实我们在源码里面是看过的啊,本身aggregate有很多种,呃,重写的方式对吧?呃,有这个,这个很多很多种重载的方式啊,参数可以重载。
14:03
最简单的方式是什么呢?当时给大家说的是最简单的方式啊,最简单的方式就是里边直接传一个参数,一个aggregate function式,当时跟大家说这个aggregate function就是一个典型的增量聚合函数,对吧?啊,然后这个增量聚合函数里边它的特点是什么呢?哎,你看它要传的这个参数,给的这个参数类型有三个,Input ACC output,哎,也就是说它的输入输出数据类型可以不一样,对吧?Input output分别是不同的类型,那这个ACC又是什么呢?哎,我们不是说聚合函数里边有一个accumulator吗?它就是中间的聚合状态对吧?啊,这个中间的agg agree aggregategate的state,这个中间聚合状态它的类型放在这儿,它们这三个类型都可以不一样啊,这就是我们所说的这个aggregate function啊,它是一个增量聚合函数,我们调aggregate的时候,最简单的就是传一个这个,但是这里边我们说到就一个这个aggregate function,这不。
15:03
屏啊,因为在里边你会发现它这个实现的过程里边拿不到当前window的信息,对吧?因为这个agate function本身我们说它就是一个呃,就是flink API common functions下边的一个一个function,这本来就是一个比较,呃,比较底层啊,通用化的一个一个增量聚合函数,所以现在我要拿到当前的window信息,该怎么拿呢?诶,我们这里边给大家看到它有另外其他的传参方式,可以怎么传呢?大家看可以传两个参数,一个叫做pre aggregator,一个预聚合器,预聚合函数对吧?这里是一个aate function,这里做增量聚合,然后后边呢,后边是一个window function,这个window function当时给大家说过,它本身是一个全窗口函数,对吧?哎,那它传这两个函数的传,传这两个参数的含义是什么呢?表示我要把前面每一个数据来了之后,先调这个F。
16:03
不是get function做增量聚合,做预聚合,你看他管这个叫做预聚合对吧?啊,那预聚合完成之后呢,最后窗口要触发的时候,不是要调这个window function吗?我在调这个window function,哎,那大家可能想到window function不是全窗口函数,你要存所有数据吗?在这里边定义了,就是在我们当前这个aggregate的第二个参数位置定义的这个window function的,其实它并不会保存所有数据了,哎,你也可以认为它是一个全窗口函数,对吧?它只有在窗口要关闭的时候才调用一次,但是现在它就没有保存所有窗口里边的数据了,因为所有数据都已经处理完了,得到了一个预聚和结果,对吧?它这里边呢,只要拿到你看它的这个数据类型,Input output,还有是key的类型,还有一个window的类型,所以它的input是什么呢?它的这里边它的input就是当前预聚合的结果的输出,就是当前这个V,大家看到没有。
17:03
对吧,这个这里边输出结果是V,它这里边的输入就是V,然后呢,他在做转换,拿到window的信息啊,结合做转换得到一个数据,数据类型叫做R啊,所以最终我们是做了这样的一种转换啊,那接下来我们可以看一看,就是里边到底要传什么参数啊,当然就是传两个自定义的函数了,对吧,函数类啊,那我们又一个当前这个我就叫做count a j对吧,自定义一个这个东西,然后呢,后面那个window function,我就直接自定义一个啊叫呃,这个就是当前的这个counter view,就item item it view window result对吧,把这个定义出来,这就是我们呃,自定义这两个类,就可以实现这样一个增量聚合,又结合window的信息输出一个item view count这个需求。啊,然后我们当然就是要去真正的实现这两个东西了,那这里边自定义语句和函数,那就是一个aggregate function式,对吧?我们说的这个增量聚合函数啊,Class count hg,然后它需要实现的是一个aggregate function,诶这里大家要注意,就是本身aggregate function,大家还记得之前在那个table API里边也有一个对吧,那个是flink table,呃,Table API下边的一个function,这里面我们要用到的呢,是flink API common functions下面的这个这个interface aggregate function,然后里边传的这个类型呢,是input,还有ACC,就中间聚合的类型,状态的类型,还有输出output类型,那现在输入的类型是什么呢?样例类嘛,User behavior,对吧?先把这个先写到这,然后中间我要聚合的一个,呃。
19:00
当前的这一个中间状态是什么呢?哎,其实大家想到我当前调用啊,每一个窗口里边要去做这个统计的时候,既然我已经是按照item ID做分组了,那当前的这个商品我最后想要聚合一个什么状态,不就是那个count值吗?哎,所以这里边我的这个当前的这个状态就是一个长整形的count数,对吧?啊,就把它放在这儿,然后还有输出,输出是什么呢?诶有同学说输出呢,肯定是这个样例类啊,Item view count,但是我们说在这儿你拿不到那个window信息,对吧,你包装不起来,所以呢,我这里边的输出,输出是要给传递到下面那个window function里边去的,那我传给它什么呢?不用传别的,是不是?只要把这个count值传过去就完了。哎,所以这里边就是直接把这个状态传传过去就完事啊,那所以这里边我们看一下具体的实现,大家看这个接口里边必须要去重写的方法都已经列在这儿了,就这四个方法对吧。
20:00
啊,哎,那首先我们看跟table API里边有点像,首先要创建一个accumulator,这个accumulator就是我们当前的要聚合的那个状态,对吧?做聚合的状态,那首先一上来之后,这个状态应该是几呢?当然是零了,对吧?呃,你我们的状态就是那个countt值嘛,大家看啊,就是聚合状态就是当前商品的count值对吧?哎,那一上来当然是零了,然后呢?诶,类似于之前我们那个accumulate方法,这里边重写了一个at的方法,这个at的方法就是每来一个数据会掉一次这里的at方法,那怎么办呢?Countt加E不就完了吗?哎,所以大家看注意这里边我们不需要改变这个accumulator,而是怎么样呢?它会直接返回一个长整形,这个长整形就表示返回的更改之后的状态,所以大家看这个写法就非常舒服了,对吧?哎,我不需要去专门定义这个accumulator,必须得是一个。
21:00
可更改可更改的一个参数啊,单独再去定义一个类的实例什么的,不需要这么做了,那我直接返回accumulator之前的数加一是不是就完事了呀?啊,这个就比较简单啊,所以大家看就是每来一条数据调用一次at count't值加一对吧?哎,就这么简单,然后后面还有一个get result,最后你得到的这个result是什么样子呢?诶,那当然就是还是accumulator这个状态,直接返回给那个window function去做操作就完事了啊啊就是这样的一个过程,那后面还有一个墨墨,其实这里边没什么用,它主要是用在哪里呢?用在session window里边去做那个窗口合并的时候用的啊,那即使没什么用,大家这里边也可以定义一下,对吧?那你怎么定义呢?怎么定义这个末认方法呢?哎,它两个状态A和B,大家来看都是长整形对吧,状态怎么合并A加B呗,啊这个非常简单对吧,这样就。
22:00
把它定义好啊,这就是这个自定义的预句和函数的过程,那另外我们还要自定义一个啊,这个窗口函数对吧?Window function,哎,这里边的这个自定义的这个呃,窗口函数啊,我们叫做item view,呃,Window result啊,其实应该是item will count window result对吧?啊,这个大家知道什么意思就可以了啊,然后我必须要去实现的是一个,大家看是一个window function,注意我要引入的是上面这个,就是大家点进去会看到这是一个scale的,呃,SC的一个treat对吧?呃,也是类似于一个接口,但这是SC代码啊,跟前面这个Java的interface大家不要搞混啊,这里边我们这个浴具盒的这个,就前面这个areate function,这是一个底层的Java代码,Java interface,而现在的这个,呃,这个window function的话,它是一个。
23:00
Scale的treat对吧?然后里边的类型大家看到了input output key和window对吧?Input那是什么?就是这里的输出嘛,对吧,长整型long,诶,这里啊,长整型,然后它的输出那当然就是我们的样例类了,Item will count对吧?啊,那后面还有K的类型,K的类型这里稍微会麻烦一点点啊,那为什么为什么麻烦呢?这里面大家看到我们定义的这个K是什么呢?Item will come item ID对吧?那大家说这这没什么难的呀,你直接item itd不就是那个长整形吗?你这里边给一个长整形就完了嘛,哎,大家看我这里边定义这个长整形,然后后面window我知道是time window对吧,我直接把这写出来,Time window写到这儿,然后大家看到这上面是报错的,为什么报错呢?这俩这俩类型不匹配,因为当前KBY之后,大家注意啊,KBY之后得到的这个k stream这里。
24:00
它的那个K的类型是什么呢?是Java temple对吧?哎,是一个元组Java元组类型,而你这里边如果要是说哎,就是直接定义这里边K的类型是长整形的话,这当然就不匹配不匹配了,所以这里我必须要定义成一个圆组类型,大家看啊,这个temple啊,然后引入的是大家注意是flink API Java temple.temple对吧?把Java元组的这个类型引入,现在大家再看上面是不是就完全不报错了,哎,这个就完全一致了啊,所以到下边呢,我们最终要实现的就是一个apply方法,这个apply方法其实也非常简单,就是要拿到当前的item ID啊,另外我先都列出来啊,还有当前的这个window end对吧,最后还有当前的啊,就是一个count值,其实就是这这三样东西,然后最后呢,包装成一个item will count输出对吧?啊,其实就是这样的一个过程那。
25:00
咱们一个一个来看吧,首先item will count这个怎么去拿呢?哎,大家想这个item ID啊,Item ID怎么去拿呢?不就是key吗?啊,但是这里面K它是一个抓va元组类型呀,那怎么办呢?诶,这个元组类型大家知道它本身还是一个抽象类对吧?那它具体的实现是什么呢?哎,我们现在知道里边其实就一个元素嘛,就一个item ID嘛,所以它是一个TEMP1类型对吧,一元组类型,所以我这里边要去写一个大家知道那个对应的这个类型转换的时候,对吧?As instance of这里边把它指定出来,然后里边要去确定的就是什么,里边就应该是一个TEMPLE1对吧,但这里边大家注意啊,因为scale里边本身也是有TEMPLE1类型的,你这里边不能用这个scalela里边的这个TEMP1类型,对吧,你看这里边默认引入的话,它是scalela里边的类型,那怎么样呢?我在上边把Java。
26:00
下边的这个一的这个类型引入对吧?然后接下来这里边你如果点进去看的话,这就到了这个下边了,对吧?诶这个就这个就没有毛病了,这就是我们Java元组里边T类型的一个实现一元组类型哦,那所以它本身还有泛型,我们当前是这个长整型对吧?把它拿出来,然后另外你这拿出来之后,它是一个他姆一的类型了,那里边那个长整型值到底是啥呢?诶他姆一里边有一个属性就叫做F0对吧?这个F0就是我们对应的那个元素的那个类型,哎,所以这里边我调的是它的F0,这样把它拿出来啊,稍微有点绕是吧?然后window and呢,就用window.get and把它拿到,然后下边的count,那我们就直接,呃,那那用什么count是不是在这里input里啊,而且input大家知道window嘛,Window function吧,我们说它本身是一要把这个数据。
27:00
全中函数,把数据要全存下来,放在这个input里的,所以是一个特类型,那现在我们知道其实里边就有一个数嘛,就之前我们拿到的那个聚合结果长整形的那个count值嘛,哎,所以直接input,它是个,呃,这个inter类型,那我就直接用它这个你也可以用迭代器,对吧?然后直接点next拿到不就完了吗?好,所以接下来最后再输出,注意输出的时候本身apply方法没有返回值,输出的时候用什么呢?Collect对吧?哎,这里边这个collect叫out,那所以我们这里边的用法还是out.collect然后包装成一个item will,把我们前面前面写好的item ID window and还有count包装进来,这样就写完了。这就是得到了一个窗口聚合的结果。
我来说两句