00:00
准备工作都已经做完了,那接下来自然就是在Java下边要去新建我们真正的处理的那个类,对吧,真正做做实现啊,具体那个实时热门商品统计的那个类啊,然后去做具体的业务流程的实现了,那这里面我还是带上这个包名啊,com.at硅谷点前面我那个叫hot it_analysis当前我这个类这个不在B下边了,我这个直接就叫做hot it好了,Hot it。那首先这里边PSVM,我把这个main方法,主方法先写出来,大家知道这里面我需要有这个throw exception啊。然后接下来是不是首先应该创建环境啊,还是标准流程吧,Get execution environment,我把这个还是定义成env这里啊,首先我们这里面的第一步创建。
01:03
执行环境好,然后接下来当然是针对这个环境要做一些设置了啊,那首先我们还是啊,因为后面方便我们做这个打印输出测试,我就全局的并行度设成一,这个并不影响结果。然后另外是。大家想一下,我是不是可以设置当前的时间语义啊?我现在的时间语义应该用什么呢?数据里边既然是有时间戳,那是不是我们肯定是要统计,那就是肯定基于那个事件时间进行统计操作,对吧?所以这里边我们设置一个事件时间,把这个列好。好,有了基本配置之后,接下来自然就是读取数据了,对吧。读取数据。呃,创建data stream数据流。啊,其实这个大家知道最终得到的啊,直接从文件里边读的话,得到的是不是肯定是一个data stream stream啊,啊,然后这里边我们其实就是我把这个叫做input stream,基于V去read the text file对吧,直接读就完事了啊。
02:15
好,那这里我就直接先把这一个文件路径copy过来。因为大家知道实际生产环境里面不会去读文件的,对吧,肯定是那个卡夫卡啊,我们先从这个文件里面读取出来,我们先把逻辑先捋清楚,然后接下来第三步,那是不是就是把它转换成对应的那个po类型啊,对吧,转换为po。然后另外注意是不是还应该分配时间出automark呀,因为我们是事件时间对吧,尽可能的靠近数据源去分配automark啊,所以这里边我们在这里做分配时间戳和water mark,诶这个我既然已经知道是怎么回事了,大家就可以稍微的快一点,比方说我这个得到数据结结,呃,最后的数据结结构应该是什么样子呢?什么类型呢。
03:11
是不是应该还是string?我最后是不是要转换成poal啊,那当然是定义好的那个user behavior对不对?哎,就是这个类型啊,我把这个叫做data stream,或者大家叫做user behavior stream也可以啊,我们基于input stream,接下来是不是就是直接做一个map转换呀,我这里干脆不用用那个map function了,我直接写lada表达式了啊,每一行读进来之后,接下来是不是首先得到拆分对吧?Spli得到这样一个呃,字符串数组,我这里边就叫做fields吧,所有的那个字段,那是基于line去做一个split csv文件分隔符是逗号啊,所以我先把它分开,然后接下来是不是直接return你一个。有点啊啊,这不就是我们定义好的吗?然后里边的话,那就那就得看具体的了啊,前面我们定义的这个类型是不是。
04:09
哎,这里面大家看到前面都是长整形,然后是inte,然后是string对吧?哎,那所以接下来都是长整形啊,那那我这里边你有一个long,里边是FIELDS0,这是当前的item ID,呃,User ID对吧,第一个是I user ID,第二个是item ID,还是一个用一个长整型FS1。第三个字段是inte。对,所以这里边我们是那个category ID,类别品类ID f2,然后第四个字段是那个行为对吧,Behavior,那它本来就是stream,所以是不是直接拿过来就行了,FIELDS3放在这儿,最后还有一个长整形的时间戳对吧?呃,New,一个长整型FIELD4,大家看就是这样对吧?啊,就这样一步一步啊,把这个对应的字段提取出来,包装成user be behavior这个pole,然后后边是不是要assign time stamp and worldmarks啊。
05:13
哎,那接下来就我们得考察一下了,当前这个数据我到底应该给乱序的处理那个bounded out of orderness呢?还是升序的an time那个呢?哎,这个还是要看具体数据,对吧?实际生产环境里边,大家知道,如果这里边没有做过ETL的话,是不是肯定是乱序啊,啊正常来讲肯定是乱序,但是这里面大家看到已经做过ETL了,那就没准我们看一下啊,1511658000,诶大家觉得这个是一个秒还是毫秒啊,啊有同学说,诶最后三个零,这肯定毫秒啊,啊不是对吧,大家看到我们看它的时间,那你得看这个数到底多少位,到底多大,对不对,你也可以认为它是一个毫秒,它如果是毫秒的话,那是不是就相当于只有这么多秒啊,那就相当于只有这个一百一百多万秒,那是不是相当于有可能只在197几年的一个时间啊,是对吧,现在你真实的场景肯定不可能是这个时间啊,所以一般我们看到如果是。
06:13
十位的话一般呢,就是一个秒数对吧,13位的话是一个毫秒数啊呃,所以这个是一个秒,我们看一下000000,哎,都是同一秒来的啊,后面来了001对吧,一秒之后的啊,哎,002003,哎,大家看是不是升序的呀,哎,所以当前我定义这个我要去拗一个是不是直接用一个a sending time sample就可以了,然后里边只要给一个提取的规则,是不是用这个element,拿出来之后要get time stamp,注意这是一个秒,那么我们对乘以1000对吧,得到一个毫秒数,这就是这一步操作啊,基本的一个转换操作。然后接下来呢,自然就是要做什么了,是不是应该要去做一个分组开窗聚合了,对吧?分组开窗聚合,呃,这里面其实是呃就是得到的应该是那个得到啊得到每个窗口内是不是各个商品各个商品的。
07:26
呃,就是它的那个热门度,也就是浏览量的那个count值,Count值啊,那大家记得我们是要把它包装成一个什么东西来着,这个聚合结果是有类型的,对吧,我们提前就已经定义好了,管它叫做item will count,对吧?啊,这个是已经定义好的啊,所以最终我们这一步得到的就是它,所以。呃,我把这个定义出来,这个可以叫做window聚合的结果,Window a stream,基于前面的data stream。大家想接下来是干什么操作一步一步来,要开窗首先得分组对吧?诶,但是大家不要忘记我们一开始的数据里边,他的那个行为是不是除了PV之外还有别的呀,我们当前按照那个热门度的定义,是不是只要PV行为。
08:13
那这个问题就来了,我是应该先做这个筛选呢,还是先分组开窗之后再去做处理的,当然是先筛选对吧?呃,先筛选的话,后面是不是处理数据就少了呀啊,这个肯定就会更方便啊,所以我先来一个filter。Filter这里边,诶大家知道这里边要写的其实就是一个你有一个filter function去实现就完了,我这里边呢,也简单写,我直接写格拉姆的表达式,大家看一下啊,那我当前的data来了之后,怎么样就要把它滤出来呢?是不是就是要它的那个behavior等于PV就完事了呀?哎,那我们知道一般跟这个字符串比较的时候,为了防止出现那个空指针异常,一般是对先写字符串对吧?啊叫PV,然后我们要equals,当前data.get behavior对吧,要做一个这样的一个处理,所以我写一个注释,当前是过滤PV行为。
09:12
然后接下来,那是不是就可以去分组开窗了呀,对吧,当前就可以,呃,KBYKY按什么分组,Item ID,对,按照商品ID分组,按商品ID分组,然后接下来呢,哎,对,直接太魔window开窗就完了,我们不是要开滑动窗口吗?Time点哎,大家看一小时的滑动那个窗口长度,然后每隔五分钟滑动一次,对吧?哎,所以我们开一个滑动窗口。开划窗。诶,接下来是不是就是一步聚合操作aggregate对吧?而且这里边我们是不是得先定义一个增量聚合函数,后面再跟上一个全装构函数,包装它之前的那个结果,包装成我们想要的这个item count,对吧,要有窗口信息嘛,所以接下来我自定义的这个先声明出来啊,你有一个我管这个叫做item count。
10:17
A。然后后面这个我叫做呃,Window,呃,Item count result对吧?好,我把这两个先定义出来。我把这个对齐啊,呃,我先写到这,最后应该肯定有一步操作是en nv.execute对吧,具体实际生产环境里边,大家注意,里边一般都要给一个job name,要不然你那个提交之后不知道它那个运行起来,你找不到到底是哪一个任务了,对吧?所以一般我们要写一个啊,比方说当前就叫hot items analysis,诶直接把这个写在这。接下来我们要做的当然就是实现自定义的这两个函数类了,对吧?
11:05
好,首先实现自定义,呃,这个聚合函数啊,增量聚合函数,或者有时候管这个叫做预聚合,为什么呢?这相当于我先预先做了一个聚合,最后是不是还要交给全窗口函数去做处理,最后才输出啊,哎,所以有时候管它叫预聚合啊。增量均衡函数,那这里边非常简单,Public static class,然后是item count a。这里边我们要去实现的是一个aggregate function这样一个接口,诶大家注意,当然这里边没有选错的可能因为它默认这里面就没有那个table API的那个选项,对吧?啊大家引包的时候千万不要引错了啊,不要跟之前那个table API的使用混淆,这是大家注意这是flink。当前这个是flink API common functions,并不是table下边的functions,对吧?啊,所以这是统一啊,Flink底层给我们定义好的这样的一个通用的聚合函数,呃,那么这里大家知道它是有类型的aggate function是不是三个类型啊,输入类型,聚合的中间状态那个累加器类型,还有一个输出的结果类型,哎,那现在我们的这个输入类型当然就是对user behavior口类嘛,呃,那然后接下来。
12:27
我的中间聚合,聚合的结果呢,大家想是不是就是直接就是长整形啊,对吧,大家想肯定就是我想得到的就是一个长整形count的值嘛,哎,我聚合的就是这么个玩意儿啊,那最后输出的结果,我倒是想要输出,直接输出这个I view count,但是我能拿到吗?拿不到对吧?因为你得里边得有那个window信息啊,我现在根本拿不到,所以没办法,我是不是直接就把这个长整型的count值输出就完事了呀,对吧?啊,就什么结果直接输出后边要包装那个item view count,那是全窗口函数的事儿了。
13:02
所以接下来我把这个里边的四个方法做一个实现,Create accumulator,创建这个累加器的时候,长整型是不是就是0L啊,然后I的方法,那是不是就是来一个加一啊,所以非常简单,Accumulator加一加一。加一对吧,呃,然后get result的时候也非常简单,当前状态直接返回搞定,那最后墨也一般我们用不到,是不是直接A加B就行了,对吧,直接放在这里就搞定了,这就是这个自定义的增量聚合函数。然后接下来还有就是。自定义全窗口函数,那么我们知道全窗口函数的使用,本来如果单独使用全窗口函数的话,它会把所有的数据都拿到,对吧?哎,那我们这里边其实就不是要做这个事情,而是把增量聚合的结果拿到就行了,所以window item,诶是叫item count result是吧?我把这直接copy一下。
14:06
好,那下边我要去实现的是一个window function这样的一个接口,大家还也还记得window function它的泛型是不是四个啊,Input output,还有当前的key和当前的window类型,对吧?哎,所以接下来我们这里边input。注意它的input是什么,它的input对是前边是不是增量聚合的结果啊对吧,这个要匹配上啊,所以它是一个长整型了,然后接下来它的输出呢?哎,输出是我们要的那个IW com对吧,包装的就是它嘛,最后当前K的类型是什么呢?K的类型,哎大家看这是你用了一个字段名称对吧?所以那是不是temple类型啊,元组类型temple,然后接下来窗口类型time window对吧。大家时刻检查一下这个写完上面是不是类型完全不报错了,完全匹配对吧?这个就对了啊啊,然后接下来我们这里边就做一个具体实现,要实现的是一个apply方法,那我们想要的那个item view count里面三个字段分别从哪拿呢?首先我要的是一个item ID,当前的ID从那拿,诶我当前就是按照ID分的组嘛,是不是就从这个当前的key里边拿就完了对吧?啊,Temple里边拿啊,当然这个稍微的麻烦一点,因为它是个元组,我是不是要get field,因为是一元组,就一个拿出来完事对吧?然后第二个字段是window and,那么它从哪来呢?诶,大家看这里面参数,第二个字段就是window嘛,所以我当然就是window.get end啊,啊,之前我们那个呃,CQL或者table API里面不是也有那个去取它的end的那个方法吗?对吧,那我们现在是一样的啊,只要有window直接get end拿出来就完事了。
15:58
它本身得到的就是一个是不是就是一个长整形啊,当前窗口的结束时间,长整形时间戳好,那最后还有一个当前的count count从哪拿呢?诶对,是不是已经在当前的input的这个数据里了,对吧?诶这里边它是interable,本来是要存全量数据的,所以它是一个可迭代的集合类型,那现在是不是其实里边就一个数啊,所以input直接你既然可迭代类型嘛,我用你的迭代器是不是next拿出来就完事了,这就是我们最后要的东西,那最后输出怎么输出,注意不是return,因为function是不是没有返回结果啊要用对大家看最后一个参数是不是这个collect out呀,对吧,out.collect去做一个输出,这里你有一个item will count,里边一个一个写进去item ID,然后window and count值是不是就是这样啊,这就是我们前面做这个窗口聚合。
16:58
这个过程啊,完整的一个实现,到这一步我们就已经拿到了每个窗口里边每个商品对应的那个count值,对吧,而且我们包装成了一个po类输出。
我来说两句