00:00
我们已经给大家用这个data stream把这个实时热门商品统计耗items做了一个完整的实现啊,那有同学可能就会想到,那假如说工作当中我们就要用到table API,那怎么办呢?诶,当然用table API去实现其实也是可以的,接下来我们就再用table API和fli CQ给大家把这个代码做一个具体的实现,好,那接下来我们首先还是在这个代码下边啊,新建一个object,当前我们这个就叫hot it啊withq啊,那当然这里面我为什么写这个Q呢?因为大家知道如果用table API做一个完整的实现的话,大家还记得之前我们做那个top table API得得得怎么实现呢?就可能需要定义很多状态,对吧?为了保持我们最后的这个呃一致性啊,最后结果的正确,你可能要TOP5的话,我们我们定义五个状态,这个自定义udf的这种实现方式可能。
01:00
能会比较麻烦,所以说呃,就是官网上也有这个推荐的实现方式啊,推荐大家是直接用CQ里边的一些函数去实现啊,这里面我们可以先结合,大家先可以看一下,就是前面我们做聚合的这一步,其实直接去用这个table API去实现是完全没有问题的啊,所以我们可以先做一下这个QAPI的实现啊。啊那这里面首先我们还是该引入的东西先做的这个操作啊,我们先先把它这个呃执行环境先做引入对吧?我们当前这个stream execution environment get execution environment,然后环境这个下划线影视影视转换先引入啊,然后接下来我们还是把当前的这个该配的时间特性配配出来,对吧?Characteristic,我们用视线时间啊,另外还有就是我们的这个并行度啊,就是如果说不影响最后的结果,大家看的清楚一点的话,先全局设成一。
02:00
然后这里边又有另外一个问题啊,我们先把那个后边的这个转换也都转换过来吧,对吧,你像前面这个,呃,从流里边去读取数据,因为我们即使是做这个table API操作最简单的方式其实也是直接把它这个转换成流,对吧?然后再去定义我们对应的那些,呃,Table里面的那个字段定义就可以了,我这里面直接把这个copy过来,然后前面还应该有这个input stream啊input stream我还是直接用这个读取文件好了,这个比较简单一些。因为这个卡夫卡我们已经测过了嘛,现在我们就直接都取文件里面的数据转换成流,然后接下来大家注意是不是还需要去定义tableable环呃表的执行环境,对吧?啊,那如果说我们要用这个table API的话,其实这里边泡文件里边是不是还需要去引入一些东西啊啊,当前我们就只在这一个模块里边给大家用一下这个table API就可以了,所以接下来我们就直接在这个子模块里边引入dependency啊,那当前的这个dependency呢,大家可以还是可以参考,就是我们之前的这个。
03:09
在这个table API和弗link CQ里边,当时我们只要引入一个,大家还记得引入那个planner就够了,对吧?而且这里边大家需要注意啊,我们后边要用到的一些函数就是flink CQ里边用到一些函数,只有blink里边给我们提供了这个版本,所以这里边我们要引入的呢?哎,不仅仅是这个table planner对吧?我这里边要引入的还得是当前的那个blink版本的那个planner啊,所以这里边大家还记得这个ner,就是后边直接给一个blink,对吧?啊,就把这个引入就可以了,另外呢,我们这里边也可以做一个当前版本的一个配置,你像我们之前不是定义了这个属性里边对应的这个版本,我们都是用这个啊,就是property去指定的嘛,所以这里边我们也可以把这个copy过来啊,就这里边我们这个的版本2.12对吧?哎,直接把这个copy过来,另外还有就是flink的版本。
04:08
当前我们用的是一点十点一把这个copy过来,好,然后把这个引入之后,我们可以看到projects,这下边当前应该会多一个这个table planner blink,对吧,有了这个之后,我们接下来就可以用这个table API相关的一些操作了,呃,好,先把这个该引入的都引入。在这儿啊,接下来我们呃,定义定义表执行环境啊,那大家知道这个在定义这个,呃,当前十点一点十点一里边默认还没有把blink作为我们当前的默认执行环境的,对吧?所以我得按用那种就是创建settings的方法去给它再做一个重新的定义,Environment settings,大家还记得这里边我们是要先去呃,New一个instance对吧,然后我要求去use blink,然后stream mode,然后哎,这这里边是一个builder,然后我去builder.build对吧,把它创建出来,然后下边就可以定义当前的table env,那就是基于当前的这个流式处理环境去创建一个stream table environment,它是调它的create方法传这个env。
05:30
以及settings放进去啊,就这是先创建环境,然后后边的操作就非常简单了,我们可以基于那个流去创建一张表,对吧?基于data stream,呃,创建table,好,那接下来我们还是把这个直接叫做呃,Data table吧,那个叫做data stream吧,我们直接table烟,大家还记得这个方式对吧?From data stream,然后里边把data stream传进来,后边我们还可以定义对应的字段啊,那当前其实我们最关心的啊,没那么多需要关心的东西,我们可可能最关心的就是item ID对吧?呃,这里边你既然是要引入这个,呃,我们当前用这个scalela symbol的方式去处理这个表达式的话,得有相应的影视转换,这里边把这个影视转换引入啊,那接下来这里就可以了啊,Item ID这个我们比较关心,另外呢啊,Behavior这个我们是要的啊。
06:30
因为我们后面还还需要针对它那个行为去做过滤嘛,Behavior,然后另外呢,很关键的一个time stamp这个药对吧?Time stamp这里边我可以把它定义成当前的事件时间字段对吧?另外呢,方便我们后续在这个CQ里面调用,大家知道time Sam是一个关键字,所以我把它给一个重命名,叫做这里边我只要这么几个,那我直直接定义这么几个就够了,对吧?哎,后面的我就不要了啊呃,然后这里边除了这些之外,把这个先创建出来这个table之后,其实我们就可以直接调用这个table API去做转换了啊,那首先我们先来就是这个第一种方式啊,Table API table API,呃,进行开窗聚合。
07:30
统计我们先做第一步操作,因为这个调用table API其实是非常简单的啊,大家可以看一下,我们先得到这个HC table,基于前面定义好的data table去做一个转换,那首先第一步不是做filter吗?那就吗,这里边我们这个behavior必须要注意三等号对吧,等于一个PV,然后接下来已经做了这个filter之后,后面其实就是大家还对对照着我们之前的这个流程啊,后面就是K,然后开窗,然后聚合,就这么几步,但是在这个table API里面调用的时候呢,不是先做分组的,诶它是怎么样,它是后边group by是把那个window也要给个别名作为分组标准的,对吧?诶它是先开窗,然后后边再去做分组,所以这里边的顺序啊,稍微变一下,我们这里边是直接window里边当前当然就是一个group window了,是一个滑动窗口,对吧,所以。
08:30
你是slide,然后后边over多长时间一小时对吧?One hours,大家看这个one hours和hours都是一样的啊,一样的含义,单复数都行,然后后边every定义不长,五分钟滑动一次,那么给一个minutes 5minutes,后面呢,还要指定当前的时间字段on ts,最后as给一个别名对吧?啊,给一个W,或者说我们这个滑动的窗口叫SW,这都是可以的,后边就要基于它去做group by分组,分组的时候呢,必须还应该有一个当前的分组的那个K对吧?哎,那当前的分组K当然是item ID了啊,另外还有一个就是当前的这个窗口,作为这个别名,作为这个分组的标准也要传进去,最后做一个聚合,哎,那我们知道这里边做聚合的话,呃,这个不想那么麻烦了啊,因为你如果要是包装那个item view count那个数据类型的话。
09:31
啊,呃,这个稍微有点绕是吧?我这里边就干脆直接掉已有的函数,然后输出对应的字段不就完了吗?诶,所以这里边其实就是直接select这里边的字段就好了,我我最后想要的这个字段是什么呢?Item count里面不就是item ID,这不是第一个吗?对吧?然后不是有那个window and window and怎么去处理呢?哎,大家知道我这里边不是有窗口吗?SW对吧?sw.and直接调它这个and方法,And函数啊,那给他一个别名,这个叫做window and对吧?然后另外还有一个参数是count啊,那这个count怎么算的?哎,这里边你item midd这数据都在这了吗?调用直接调用我们现成的count方法,然后item midd.count as跟一个别零,大家知道count也是CQ里边的关键字函数吗?所以我叫一个CNNT对吧,这样的话就没问题了,所以大家看这个处理过程。
10:31
就是非常非常简单对吧?啊,这个就是你直接这么一定义,然后输出我们CF的时候啊,你假如把这个ATA table转换成流做一个打印输出的话,这就跟我们的那个item view count是一样的,只不过是没有包装成那样一个样例类而已,对吧?啊,其实三个字段是一模一样,而且我们在这张表里边也有定义,它分别叫item ID window and和CT。这是这个过程,呃,然后接下来我们就要做这个,就是按照window and去做分组,然后再去做排序了,但这个稍微有点难办对吧?哎,大家想这个前面这个看起来这个很简单,这个操作比我们之前的那个电SAPI简单了很多,但是后边要做这个top n的选取排序,选取的话我们说那就只有分组的话比较简单,Group by嘛,Group by window and,然后接下来呢,那就需要去自定义一个aggregate function,然后我们说哎,你这个定义,你如果要是选取TOP5的话,你定义五个状态对吧?而且你会发现这个它的扩展性也不好,你像我们之前的这个实现,实现这个过程里边,其实是把这个50作为参数传进去的,对吧?我直接所有的数据来了之后,全全收集起来,做个排序,然后就直接这个top几,我就选取几就完事了,而你这里边呢,哎,为了我们要考虑它这个状态的控制,你这里边是每一个状。
11:57
它都要单独定义出来,那那这种方式很肯定就会很麻烦了啊,所以这里面我们考虑另外一种情形,这种情形是用用CQ去实现的选取好,那当然大家知道这个对于CQ而言,想要做一个排序和提取这个呃,就是前几名的这个操作应该会比较简单对吧?哎,之前大家记得在CQ里边有一个函数叫做rank。
12:31
还有一个函数叫做number,诶大家想到这就非常简单了,我可以直接先做排序,然后就按照它的那个rank或者number去,呃,选这个前几名不就完了吗?呃,对吧,把把对应的这个呃,你当前这个排序,比方说排序的这个序号小于等于五的拿出来,这不就是前五名吗?所以这个处理思路其实非常简单,但是对应的这个操作呢,没有办法在table API里面实现,当时给大家讲那个聚合函数的时候,就说了很多函数table API是不支持的,比方说rank,比方说row number,所以这里边我们就只能用CQ去实现这种简单的方式啊,呃,这种简单方式只能用CQ去实现,Table API的话只能自定义udf了啊,所以这里边我们选择用CQ去实现,我定义一个result cable啊,那当然在这个之前,我们需要先把这个table做一个注册,对吧?啊,前面我这个table env create tempary,比方说这个我就叫做。
13:32
做a table,呃,里边我我可以把这个a table传进来,还可以指定对应的那些字段,对吧?啊,这些字段就是当前的这个item ID。呃,其实我也不需要改什么啊,我我只要要这几个字段就可以对吧,Window end,还有这个CT,其实就是要要这这三个字段,然后接下来我们做转换操作的时候呢,直接就是table enna query query啊。
14:07
直接去写一个query就可以了,那我们这里边怎么去写呢?大家会想到我就去select芯对吧?呃,然后我要这个表里边所有的数据,另外还要什么呢?还要它里边的,呃,就是那个roll number做一个聚合对吧?呃,所以这个可能稍微会麻烦一点啊,这里边我定义一个这个row number,调用这个函数,大家还记得这个函数吧,Row number,然后这个row number怎么样去统计这个就是当前这个,呃,所有数据里边这个roll number呢?那我定义一个over窗口对吧?做一个开窗函数,做一个开窗函数,把当前数据放到所有数据里边去做一个排序,然后提取它当前的那个序号容number本出来,所以这里边我其实是一个over窗口,那这个over窗口后边,哎,这个稍微选取的东西有点多对吧,那我们就还是啊给大家。
15:07
换行来显示吧,这个roll number啊,Over over什么东西呢?哎,这里边就定义啊,Partition by,当前的这个分组的字段应该是window and对吧,按照这个window and去分组,然后呢,哎,还可以定义这个order的by or order by排序按照什么排序呢?CNG,对吧?当前不是按照时间去排序了,我要按照这个当前的数量去排序,然后另外还是倒序排列对吧?因为我排第一的应该是那个呃,数量最大的嘛,所以这里边定一个DEC啊,大家如果要是CQ要写的漂亮一点的话,最好这些还是用那个大写对吧?这个用大写可能会稍微好一点,但是可能大写这个输入的时候稍微麻烦一点啊,我这里边就给大家直接都用小写吧,大家知道这个怎么什么个含义就可以啊,这里边是一个over window啊,然后后边可能还需要给它来一个别名,对吧,比方说我们as。
16:07
就是当前这个number调用这个函数啊,提取出来之后我就管它叫做number好了,然后把这个提取出来之后,大家可能会想到了,那你从这里边提取,那应该select这两个对吧?呃,那那接下来我应该还得去where这个,呃,当当当前的就是首先我得from对吧,Select的这个from at啊那后面我想到我去where当前的这个row number,呃,这个小于等于五,这不就完了吗?就提取出来了吗?诶,但是大家注意不能这样对吧,因为你后面这个开窗函数啊。我们这里边的V条件是在本身做这个分组,做这个聚合之前就要呃提前做的一个筛选判断,而这里边我们这个开窗函数,你是要执行完了之后才能得到中number这个字段,对吧?所以你前面这个VR的时候呢,拿不到这个中number字段,哎哟,那这个就比较麻烦了,这怎么办呢?也很简单,我在外面再包一层对吧,那就是我在外面再来一个这个select芯,然后from from什么呢?哎,整个后边我把它全部包裹起来。
17:21
对吧,把这部分全部包裹起来,然后在里边我去再去S对应的这些东西啊,这个是over函数对吧?然后这里边我当前的这一个去啊,From a table啊,我们定义的是table,把这个提取出来,当前所有的数据,所有的这个item itd window and CT提取出来,还有room number也提取出来,然后再把它where,这里边row number小于等于五对吧,而然后再去做一个所有数据字段的提取,这样的话就得到了我们最终的结果啊,所以这里边可能这个写起来还是稍微有点绕,但是大家要搞清楚我们要的是什么啊,这个CQ也就是直接这样一写就把它实现了。好,我们接下来可以来测试一下这个结果到底怎么样啊啊这里边呃,就把这个result table,我们就直接拖,因为做。
18:21
聚合嘛,我们直接to retra stream对吧,做一个这个呃,这样的一个输出,直接肉类型引入当前这个flink types点肉做一个print输出啊,最后我们在Env.execute,当前这个是hot it CQ对吧?啊这样的一个实现这个job啊,接下来我们运行一下看看效果怎么样。来看,现在已经运行起来了,我们现在已经输出了结果,这里边输出的结果是什么样子的呢?哎,我直接停一下给大家看一眼啊,这当前我们好像这么动态调整的过程当中,根本不知道怎么回事儿是吧?哎,我们先停一下,看看这个当前已经输出的这个效果是什么样的,大家看到这里边输出的这个效果呢,其实整体来讲就是不停的出out false在做这样的一个呃结果的输出,对吧?哎,我们找一个开头啊,比方说这里边我们的这个时间是呃,六小时45啊这这应该是六小时45分的时候啊,我们看一下当前的这个它是从什么,诶这里啊从头开始对吧,40这个之后,然后接下来呢,啊,就是45分的这个窗口里边,我这里边就直接啊,来了一个这个来了一条数据,先是一个处对吧,又来了一条数据,诶第二名也有一个处了,对吧,后面第三名也有处了第四名,呃,这里边不是第四名,后面这里边是什么?呃呃,大家看到这里边。
19:49
其实我们前面这里输出的这个内容啊,是按照我们就是星号先全选取出来,最后给的是那个row number倒数第二个是那个count值对吧?哎,所以这里边最后最后一个字段啊,大看这个这里边的这个1234啊,这个是一个值,然后我们前面这呢,都是就是这这是一个那个排序的那个number,然后前面这个才是当前的那个count值,所以大家看到呃,前面这个45这个数据来了之后啊,呃,这里这个3853076,这里边啊,直接count直接就输出了一个三对吧,这个count值,然后他排第一,然后这个排第一之后呢,这是新加入了进来的一条入数据嘛,那后边我们的这个之前的第一就false了,对吧,然后之前的第一呢就变成第二了,382008这个就变成第二了,那同样之前的第二是不是也要往后瞬移啊?
20:49
对吧?哎,这个之前的第二就变成第三了,然后之前的第三就变成第四了,当然了现在还没有第五,然后后边来了第五之后,哎,这是这个第五名处对吧?直接添加进来,然后如果再来一个新的,假如说它的这个数量是二,大家看他替换的是第二名对吧?哎,所以这这一条是处追加进来,然后呢,之前的第二名false,然后替换成第三,第3FALSE替换成第四,第四替换替换成第五,第五直接boss就相当于删除对吧?大家看这个结果,第五名直接一个bos没有对应的那个处的输出了啊所以这里边这个输出的结果还是还是比较正常的啊啊就是这里边大家如果要是想要去做操作的话,那我们怎么办?你就把得到的这张表里边的结果通通都都拿出来,对吧,塞到一个数据库里边,让它能解析这个tra模式啊啊,能够把这个数据都更新,那大家知道看到的就应该就是。
21:49
前五名的这个count值的对应的这个ID和count值的一个输出,对吧?而且这里边还带着当前的这个,呃,时间,就是哪个窗口输出的这个时间都是有的,我们最后去再去做一个可视化展示就完事了啊这是关于这个franknk CQ和table API结合起来做了这样的一个需求的实现啊,当然有同学可能想,那假如说我这么看的话,上面我觉得你还混合了对吧,有table API,有CQ,我如果纯用CQ去做一个实现行不行呢?啊,当然这个也是可以的啊,啊就是比方说这里边给大家写一个纯CQ实现,就是前面我不是已经注册了这个,呃,就是我不要看这个啊,我要啊,这里面这个我没有注册,所以这里我先去注册一个这个data table。
22:41
同样前面我应该还是要把这个,呃,就是所有的那个数据都已经从那个流里边读出来,对吧?然后创建出来这个table,然后接下来我这里边就是table env,先去注册一下当前的这个table,诶,我们当前的这个table env不要去value了,对吧,已经有了table env去直接create temporary定义当前的这个data table啊,然后我们这里边直接用的就是这这个data table嘛,对吧,拿过来直接用完事了,然后接下来呢,呃,这里面的字段啊,我还是给给稍微的定义一下啊,比方说这里边我要的还是这个item ID,这个定义出来的话,我们对STEM的把握更加的呃明显一点对吧,Behavior这个我们是要做这个filter啊,筛选的时候需要用到的。呃,另外还有一个字段是time Sam,这个非常重要,同样我们也是把它提取出来之后呢。
23:41
给一个这个,呃,这个row对吧,定义一个这个啊sts哦,那当然其实之前我们那个,呃,就是在定义这个,就是在在定义这个data table的时候啊,就是前面我们在做这个转换的时候,其实已经把这个转换成TS了,对吧?所以这里边其实没有time STEM这个这个字段,我们如果这么去定义的话,大家其实可以直接从什么去创建一张表了,我可以直接从那个data stream去创建表了,对吧?或者说你直接从那个data table这里边这个字段,我们这里边想要的其实都已经有了,你直接放在这就行,对吧?呃,直接不需要再做别的定义都是可以的啊,我这里边是从这个data stream去直接做了一个转换,然后接下来呢,就直接写CQL了,呃,这这里给大家定义一个这个result CQ table,同样还是table NBA直接执行这个CQ query,哎,现在的这个执行那得怎么样。
24:41
那其实大家知道外边我们其实想要做的是什么呢?还是这件事儿嘛,对吧,就整个这个流程其实一点都没变,我还是要做这个从已经得到那个a j j table里边去把它所有的这些字段都提取出来,只不过现在呢,就是这个a j table,我们这里边不能直接在这里边调用了,没有注册对吧?那所以就相当于是在这个from里边要再去写一个子查询嘛,不就是这样一个过程吗?哎,所以这里边就是把这个j table做一个替换,那这里边我们select table里边有什么呢?Item ID,对吧,Item ID。
25:23
大家再看一眼,之前我们还还提取了什么,还有这个window and,还有那个count对吧?哎,所以这里边其实就按照我们之前的那个逻辑,把这个一行一行写出来就完了。诶,这里边我们要这个,呃,Com,这个window end啊,那window end怎么写呢?这个稍微有点麻烦,大家还记得之前我们那个window怎么写吗?我先把这个空下啊,在后面给大家from,当前是from select啊,从这开始from from这个data table对吧,从我们已经定义好的这张表里面提取出来,呃,然后另外还得去做一个筛选,呃,这个是等于,哎,刚才那个PV,另外group group大家要注意了,这里边要两个字段对吧,首先是item ID,另外还需要有一个window对吧?哎,这个window我们直接定义的时候,比方说我们这里滚动或者是滑动对吧,我这里给大家举就呃,我们当。
26:23
当前其实需要的是一个滑动窗口,所以这里边应该要的是一个,大家还记得是一个ho对吧?在CQ里面定义的时候应是一个ho,哎,所以把这个定义出来,然后当前的时间字段TS,然后里边呢是先给那个滑动步长,再给窗口长度,哎,所以这个滑动步长是T,大家还记得是给一个字符字符对吧?五然后minute,然后另外后边还是一个时间间隔T1HOUR对吧?这里边的这个必须得是小写,呃呃,不必须得是那个单数对吧?呃,正常情况我们可能写大写会更加的看的更加明明显一点,我这里边直接写小写了啊,这个也没关系,这是我们对于这个窗口的一个定义,Hope这样的一个定义,那上面如果我们要这个window and的话,那怎么办呢?大家还记得是不是直接用这个ho and直接把它提取出来就完事了呀,对吧,或者说你用那个滚动窗口的话,用那个temp。
27:23
And,对吧,做一个提取,另外我们知道还需要有一个count数量,那我们这里边直接调count函数,Count当前的item ids CT,对吧?然后把这个全部选取出来就完事了,这里边我们CT不是在这儿你必须定义成CT吗?对吧?呃,然后这里面还有一个,因为前面我们有那个window end嘛,用到了window end,所以这里面我们得定义出来as window end,对吧?那这样的话我就可以play by window end了。这就是一个纯CQ的一个完整实现,好大家可以下来之后就是试一试,觉得哪种方式好啊,有同学可能就觉得,哎呀,我就完完全去写CQL,这就太熟悉了,太舒服了啊,我就用这种方式,这也是可以的,或者大家觉得呃,就是我用data stream API这种方式觉得就就很好理解,我就把把这个做一个完整实现就完了,后面还可视化,可视化的很好,对吧?呃,能做的操作比较多,或者有同学觉得,诶我就觉得这个table table API这个很好用啊,那你也可以说后边我去做那个自定义的udf,也可以把它结合起来,前半部分窗口聚合用这个,呃,Table API,后边我们做这个top n排序做这用这个CQL实现,这都是可以选择的,我们现在可选的余余地就比较多了。
我来说两句