00:00
啊,然后接下来呢,我们再用另外的一种方式,也就是调用table API和flink CQ,用CQ的方式去实现一下当前的实时热门商品统计的需求啊,首先我们还是在这个代码里边新建一个类,这个我就直接叫做hot items with CQ,我先直接把这个创建出来,呃,然后前边的这个流程大家想是不是跟跟我们这里边就完全一样啊,对吧,基本上是完全一样的啊,我这里边就不再用这个卡夫卡数据源了啊,我就还是直接用文本,就是从这个文件里边直接读取就好了。大家看到还是创建流式的执行环境,然后呢,全局并行度先统一设成一设置当前的时间,语义是even time,然后这里边我把这个先。把这个啊,我们先把这个括起来。然后接下来我把这个先放开啊,当前的这个数据类型要对应的引入。
01:07
下面的这个步骤,其实该做的操作还是一步都不能少,转换为po类型,分配时间,戳water对吧?后面的那个具体转换我们就就不做,就不用这个来做了啊,啊,当然如果有同学想要完全写成这个CQ的形式或者table API的形式的话,也可以不基于这个由去做转换,我现在是不是相当于要基于这个流转换成表啊,如果说大家不想用这种方式的话,也可以,呃,对,直接connect对吧,连接那个卡夫卡就可以了,或者你连接那个文件系统file system就可以了,我这里边的话用这种最简单的方式,好接下来这个第四步啊,那大家想是不是我应该要前面我只是创建了流式执行环境,是不是还没创建表的执行环境啊,对吧,创建表执行环境。这里大家要注意我当前表执行环境,现在不是有不同的这个呃,不同的配置吗?那我现在到底是流还是P呢?这个比较简单,肯定是流处理对吧?那到底是用这个老版本planner还是blink版本呢?啊,这个大家要注意,就是后边我们用到的一个操作呢,只有blink版本里边提供了对应的那个函数,所以这里边我们就只能用blink版本了,那blink版本在一点十的时候还不是默认,那得做一个settings的设置对吧?Environment大家还记得那个吗?有一个environment setting,这个environment settings大家看现在没有,这主要是因为对我现在是不是还没有对应的那个table API的那个相关依赖导入啊,所以接下来我需要到这个。
02:46
之前我们的这个table API这个里边把对应的这个planner导入是不是就可以了。这里边只是我们当前这个模块的,所以我就不需要放在这个外边啊,直接给一个这个depends,然后把这个plan引入这里,注意我们要的是那个blink版本的planner,对吧,所以把这个写进来。
03:11
杠blink。好把对应的这个导入。好,现在已经导入了,大家现在看一下我们当前这个hot items里边就应该会多一个,哎,多了一个,呃,当前这个table planner blink,对吧,当前就有了这样的一个依赖啊,然后里边对于我们这个呃,Table的common,还有这个这里边这个API Java API GALA,还有这个桥接器,其实是都有的,对吧?所以这个其实该有的东西都全啊,我们现在就把这个对应这个settings引入,然后是不是我现在要做那个new instance啊,对吧?然后接下来是不是use blink planner,然后接下来呃,In streaming mode,最后再来一个build,是不是这样就完事了,对吧?呃,把把我们当前这个settings创建出来,那后面我们再来stream。
04:06
Stream table environment是不是调一个create方法,然后里边传入的参数是当前的流式执行环境env,以及当前的setting,我把当前这个叫做table env,好,这就是我们对于这个呃表的执行环境的定义啊,我这里边要的是这个用。Blink版本对吧。好,有了前面的这个定义之后,接下来。我们是不是就可以把这个硫先转换成表了,对吧?呃,接下来我们是做这个将硫转换成表,当然我们还要定义对应的那些字段,以及当前的事件时间的那个时间字段是不是也得定义出来,呃,就是row time对吧?我们要做一个定义啊,这里边就是我可以直接得到一个table。
05:01
我管这个叫做data table,直接基于table env调一个from data stream方法,把前面的data stream直接转换过来,对吧?呃,里边我现在要的是,诶,我们现在要的字段需要哪些?呃,当前其实主要需要的字段,这里边大家注意一下啊,我其实可以在这个转换的过程当中,直接就做一个提取筛选,就相当于我可以做一个映射,对吧?因为里边的名称我不是都知道吗?所以有一些我可能是需要的,我就直接保留,如果不需要的是不是我直接在在这里不定义就可以了,哎,所以大家看到这里边不仅仅是只是一个重命名的这个需求啊,我可以就是提取自己想要的字段,比方说你像那个user ID,你说我关心吗?完全不关心对吧?啊,直接滤掉了不要了,那后面item ID这个要对吧,Item ID,然后呃,还需要什么呢?字段呢?Category ID要吗?品类ID。
06:00
也完全不要对吧,这个完全没用啊,那另外是不是还要诶behavior要吗。Behavior要后面我们还有一部那个filter是不是要继它过滤嘛,所以这里边behavior这个字段名称一定要一致啊behavior然后另外还有一个最后time stamp对吧?那这个我们最好给一个别名叫做TS,因为CQ里边是不是time stamp会冲突啊啊,那另外就是说我是不是可以直接把它指定成当前的对RO type,哎,这就是我们基本的一个定义的过程,对吧,直接把这个定义出来。当前这张这张表就有了。好,然后接下来那我们就是要是不是首先做那个分组开窗操作了,对吧?分组开窗,呃,那首先我们先来说一个这个table API的写法,Table API写法,大家想一下这个分组开开窗怎么写啊。得到的还是一个table对吧?啊,这个table我们叫做window a j table,基于前面的data table怎么办?
07:08
是不是首先先做一个filter,这个是一样的,对吧?流程跟我们之前那个操作一模一样啊,那现在的这个流程我们其实就直接写behavior,是不是必须要去等于PV啊,然后接下来是不是要分组了,之前我们是直接K,然后window,现在是要先window再goodbye,对吧?因为window的那个别名是不是也要出现在group分组的字段里,所以接下来window我们要开的是一个。滑动窗口对不对,对吧,滑动窗口,所以滑动窗口是。大家还记得吗?是不是叫slide呀?直接就叫slide对吧?Slide,然后必须调over,指定当前的这个窗口大小,当前是是不是一小时啊,one.hours然后接下来必须掉every,它的滑动不长,前是5.minutes,五分钟,然后必须调on,当前的时间字段是不是TS啊?最后必须调一个S,当前的别名,我随便给一个叫W对吧?
08:15
哎,这就是当前这个滑动窗口的一个定义,然后接下来group by,对吧?Group by by什么是不是item ID,这个不能少,按照item ID分组,另外还需要按照w window分组,对吧?然后最后做一个我现在不用自定义igggate function了吧,诶大家想这个我想做的那个简单操作是不是里面都有现成函数啊,所以这个代码其实会简单一些啊,直接select是不是就够了,最后我们想得到的其实这张表里边啊,Window a表里边是不是需要三个字段。Item ID window end,还有count值,呃,就是我们包装的那个item view count嘛,大家还记得吧,对吧,就是那个port类型啊,只不过现在我是放在这个表里面的三个字段而已,那我现在提取的话,是不是直接select提取就可以了,Item ID,然后后边是a w.and是不是就是它的window and呀,对吧?我可以在as一个window and as window and。
09:17
然后后边再来一个,哎,对item id.count as啊,当然count是那个C课里面关键字对吧,我们直接as CT,这就是我们想要的那个三个字段嘛,这就是聚合结果啊,直接到这一步就直接可以把它搞定了。啊,所以这个过程其实比我们前面好像是简单了一些,对吧,主要就是因为这个聚合函数是不是不用我们自己去操心啊,啊什么东西都能够直接拿到,这个就稍微的会简单一点。但是接下来如果说我们下一步要去做对应的这个,呃,接下来要要要去相当于处理我们当前的这个排序输出的话,大家想一下,我接下来用table API怎么样去实现呢。
10:02
要收集其当前这个数据,呃,同一个K里边的所有的数据,对吧,然后去做对应的那个处理,那这个这这这个怎么去做呢。哎,所以大家会发现就是这里边还是会有一些呃其他的问题啊,就是如果说当前我基于这个table API的话,它其实已经没有给我们指定套喷对应的那些函数那些方法了,所以我们就只只能大家还记得我们之前在那个CQ里边有什么有什么函数可以调吗。是不是有row number,有rank这样的函数可以调啊,那这样的话我是不是比方说啊,我后边去定义一个开窗函数over over的一个函数啊,Over里面是不是我可以order啊,Order by,对吧?可以去做一个排序,排序之后我再去取一下它的那个number,是不是就相当于是它的N啊,然后接下来我提取它排序里边比方说。
11:00
倒序排序,按那个count值从大到小往下排,然后我取里边的扔number要小于等于五,是不是就相当于我提取出来是前五名啊,所以这个思路还是很简单的,但是table API里边是不是不能用room number或者rank之类的那个函数啊,所以大家会发现,诶,这个就局限了,对吧,后边我们是不是必须要去做一个。就是必须得用CQ去实现了,对不对啊,所以这个就是大家看这个分组开窗这一步操作呢,呃,我们就直接用table API啊,这个相对简简单一点,我们就先写到这儿,然后接下来第七步是不是要利用开窗函数,开窗函数,然后哎,我们对。呃,Count值进行排序。并获取,呃,就是number对吧,Number,哎,最终就可以得到得到top盆,所以这个过程的话,我们是不是就只能用CQ去做一个实现了,哎,那基于前面的这个结果,我可以把这个window window a j table做一个注册对吧?然后接下来就可以在这里边直接去做一个提取了啊所以在这啊,我直接先定义一个,呃,这这里边可以有这个就是我我可以先把前面的这个前面我们其实用过这样的方法,就是有这个table env直接可以去create temporary对吧,直接可以把它这个注册出来,但这里边呢,比方说我这个叫AJ啊呃,然后把当前的这个呃。
12:47
前面的这个window a j table直接放在这儿啊,但是这里边可能会有一个问题,就是大家看这里边直接掉的这个方法呀,它没有掉当前那个就是stream table environment那个对应的那个方法,对吧,它是直接调到这个底层的这个table environment这个方法来,所以后边如果要去做判断的时候,它会有问题。
13:10
这里边会会涉及到这样的一个,呃,就是对应的一个一个没没法调用的一个过程啊,那这里边大家可能会想到我再去指定它对应的那个fields,能够让它识别出来吗。但是也不对,这样的话,就是当前这个调用是没有这种方法的,就是如果说你在这个stream table environment里边调啊,大家看后面你如果指定这个fields的话。是可以调到直接调到这里面的方法来的,但是前面它必须要给的是一个data STEM啊,所以这也是现在这个就是table API和这个CQ啊,就来回转换这个不太方便的地方,经常会有各种各样奇怪的这个问题,那所以那这个东西怎么办呢?也可以转,我是不是可以先把这个转成流啊,对吧?那这个就稍微绕一下啊,其实这个过程也还是比较好理解的,我就直接把这个table env,大家知道我直接to a pen stream,然后把前边,因为前面这个窗口聚合是不是只输出了一次啊,所以我to a pen stream就可以对吧?啊,那接下来我把当前的这个window,呃,就是当前的这个window a table要去做一个对应的转换,然后这里边我直接提取它的roll.class对吧?
14:25
呃,当前得到的这个流,比方说我就叫做一个a j stream,然后接下来我就可以把这个a j stream去放到这里来,然后在指定字段这个就可以了,对吧?而且这里调用的就是当前这个流的表执行环境里边的对应的这个方法了啊,所以接下来这个就不会有任何问题啊,那这里边我们定义的字段其实就是item ID,这个都不用再重命名了,对吧,之前我们都命名过了啊window end,另外还有一个CT,对吧,主要要的就是这这几个啊,那这里边在在当前这个环境里边把这个表注册出来之后,接下来的操作是不是直接就是。
15:07
我得到一个最后的result。Table就直接用table env是不是执行query就可以了,好,那大家想一下,接接下来我的这个query怎么写呢?接下来怎么写?哎,大家可能会想到,那我这里边就是select啊,开那个over窗口嘛,对吧,那我就首先CLA,呃,Select,那大家想这个我接下来是不是要把所有的那个字段都拿出来啊,要不然我最后得不到对应的那个item ID和window相关的信息了嘛,所以我取所有的芯,所有的字段,另外我是不是要得到当前的row number啊。对吧,呃,我把这个就是特别的这个函数给大家大写出来啊,我要得到当前的row number,然后row number是要是不是开窗,开窗函数去聚合啊,Over over一个,诶这里面就得写了protect by对吧?Protectition by什么按照什么字段分组呢?Window and,大家注意啊,Window and对不对?前面基于前面这个已经得到的数据,是要按照window and去做分组,然后order by order by。
16:19
哎,是不是基于这个c nd count啊,对吧,另外还要降序排列降序是不是decc对吧?这个就可以了,然后我也不用指定这个呃,范围,我是不是全全局所有的数据都聚合一次就完了,然后最后这里是不是直接还可以给它做一个重命名,得到的这一个我叫做as road number对吧,我可以叫这样的一个名,但是大家想,那接下来这个是from from哪里from a j对吧?那我得到的这个数据,接下来是不是我还应该要提取它里边的这个前几名啊,Top n对吧?所以我接下来比方说要where,这个number是不是要小于等于五啊。
17:03
那大家想我直接在后面直接where这个number小于等于五行吗?那肯定不行对吧,因为where本本来是提前做这个判断去过滤筛选的,而我们这里面是不是聚合得到那个结果表才能够有入number啊,那所以得怎么办?对,所以接下来是不是应该有一个嵌套的子查询啊,所以是select芯,我们最最外边应该是这这样一个过程对吧?然后from对括号。我把这个放在放在下边啊,From,接下来这是我们的这个子查询要做的这个事情,这里边是一个开了一个开窗函数对吧,Over,然后我把这个from a j也放在这儿啊,这里面是我们这个子查询。然后后面是不是还有括号啊,放在这儿对吧,然后再接下来,诶,是不是这就可以去where了,Where当前的row number小于等于五,是不是就是这样的一个实现啊。
18:04
啊,所以这就是关于我们用这个CQ去实现这个功能的过程啊,啊,那当然了,就是最后我们得到的这个result table的话,我们也可以把它做一个转换输出,对吧,让大家看一下当前的这个结果,但是这个结果就没那么好看了啊,我们没有做那个格式化对吧,没有写成那个字符串,我们大概看一眼就行啊,比方说这里面大家想我当前的这个是用这个,呃,就是用to retract stream呢,还是to这个stream呢?啊,大家可以先试一下啊,我们直接这个toend stream,然后把当前的这个result table放在这后边这个肉点class写出来,最后做一个打印输出,呃,最后不要忘记我们env XQ执行起来,里边当前是hot it,当前我是with with CQ的这个job,对吧?我们把这个job名称定义出来,现在我们执行一下,看看效果怎么样。
19:09
大家看这个整个流程跟我们前面是一样的,对吧?呃,就是把这个表的这个转换里边,我们再去做这个开窗一步,然后后边做这个对应的那个排序输出啊,做这个聚合统计输出就完事了,我们可以看一下当前的结果。大家看这里面报错了,报错的原因当然就是only table对吧?要求我们用这个totra stream,为什么这里边就必须要totra stream了呢?大家想一下,这里边我们做这个,呃,Number做这个聚合的时候,是不是相当于里边我们在做聚合的时候,这个结果会不会变啊。我们当前的这个结果是不是会变啊,哎,有同学说,诶之前你不是说那个窗口内做那个聚合的话,最后结果只输出一次吗?这里是只输出一次吗?大家注意窗口是不是我们在前面是在前面定义的呀,到前面我们得到这个窗口聚合结果是不是已经没有窗口了。
20:08
现在已经没有窗口了,只有开窗函数对不对,开窗函数出来之后,那是不是来一条数据就会更新我们最后的那个top n的那个结果啊,TOP5的结果对吧?所以当然这里边是to stream。那所以这里边我们直接执行一下,大家看看这个效果怎么样啊。现在应该能够看到他最后的输出了。好,大家看到这里边就输出了,这里边输出的时候是不是各种true false true false啊,是不是因为它在不停的更新啊,我这里边稍微停一下,大家看一下这在更新什么东西呢?你看这里边当前的这个2285524这个商品,它本来当前它的这个数量是13,然后排第四,然后接下来是不是它这个按他这个13的数量就排第五了呀,对吧?哎就是那为什么他会就是现在就给他排第五了呢?诶那是不是前边对应的就肯定会有当前这个窗口内就会有一个,呃,大家看前面,诶这里边是不是就你看处是不是单独这里边就插入了一条这个二期。
21:17
9675,它的数量是14,所以它排第四,插进来是不是前面的这个13就排第五了呀?啊,所以就是这样的一个更新的过程对吧?啊,所以这个数据如果我们最后要收集起他这个结果的话,这个还挺麻烦的对吧?你必须是那个外部系统得支持这样一个更新操作才可以啊,当然如果我们能够比方说你写到MYSQL里面的话,当然就可以更新最后我那个表单了,对吧,Top n一目了然,最后你再读那张表就可以了啊,那这里可能有同学还是心有心有不甘啊,就觉得这个还是中间我先写了这个table API,后边又写了CQ,总觉得稍微有点膈应,那我能不能全部用CQ搞定呢?诶,这里边给大家写一个纯CQ直线,大家说能不能搞定,当然是可以搞定的,对吧?这个肯定没问题啊,但是前提是是不是我要把一开始的那个还是要先创建一个表出来啊,就是我们当前的那个data data table对吧?啊,但是大家知道前面我们创建的时候呢,又有另外一个问题,就是调不到那个方法,所以说我还是把那个流直接转换过来,是不是就完事了,对吧?所以这里边我直接注册一个data table,然后呢,基于我们一开始的data stream这个流,然后我去直接指定当前的那个字段item ID,然后呃,后边是这个想要的啊,Behavior对吧,Behavior,然后另外还有呃,Time stamp.row time这个as ts,对吧?这个跟前面的那个定义其实一样的啊,我把这个先定义好,然后接下来那就是要去写这样的一个CQ了。
22:59
大家知道这个就是table,我这个把这个叫做result c CQ table,那当然就是table env,直接去执行一个c query里边写的时候,其实大家想到就是里边写的过程是不是跟前面这个差不多啊,对吧,我就直接把这个先copy过来,Copy过来啊,大家想一下这个差别在哪,其实还是select share from,对吧,因为你看这里边我们最终这里边的这个开窗函数是不是并不依赖当前你定义的哪个表啊,对吧,你就是就是这么去定义,只要有对应的这些字段不就完了吗?对吧,只要有CNT对吧?啊,那大家想主要的观点在于是不是这里边from a j,你这个AJ要替换成我这里的一个子查询啊,所以接下来from这里边又得来这么一个操作了,对吧,这样的一个操作啊。
23:53
我把我把这空开,然后大家看就是这里边,但是你如果看这个两个括号不舒服的话,你也可以分开啊,这里边from什么呢?当前就应该是啊,是不是select。
24:06
Item ID对吧?想要选取的不就是这些吗?Item ID,然后后面还有那个window and,呃,Window and对吧?呃,Window and稍微有点麻烦,我们先写那个count吧,Count简单,Count是不是就是count item ID,然后as cnt对吧?当前我们不是要这个CT吗?因为这里的字段顺序没关系,对吧,我把那个window and放后边,Window and现在不好弄,我们先先放后边再说啊,其实大家知道接下来我其实要的是from data table对吧?啊,然后接下来from the table怎么样,是不是where做一个筛选,Where当前的behavior对,等于behavior啊,Behavior等于对PV,然后另外还得要求接下来是不是还要group啊,分组聚合group当前的item ID以及。
25:09
以及一个滑动窗口对不对?当前我定义的滑动窗口应该是是不是hope呀,大家还记得吧,Hope对吧?Hope当前的时间字段TS,然后另外是不是要定义第二个字段是当前的滑动步长,大家还记得吧,先是步长啊,怎么定义是不是interval啊,Interval,然后5MINUTE,然后后边是INTERVAL1对R对吧,这是我当前的这个滑动窗口。然后大家就想起来了,那上面的那个窗口的结束时间怎么办?是不是直接用hope and呀,是不是直接用这个就可以拿出来啊,所以这个其实就是非常简单的这个定义啊,然后这里边给一个as,因为我们后面要用到这个window and嘛,Position by window and对不对,所以把这个提取出来,这里边as window window and对吧,这就是我们整个做这个CQ查询啊,做这个处理的过程啊,当然后边如果说我们想要去做一个打印输出的话。
26:18
必须也得是totra stream对吧?啊,这就是一个完整的处理流程,我们可以运行起来再测试一下,大家看看效果。就大家可能会觉得就是直接上来之后啊,前面我们直接connect连接到这个外部系统,然后后边直直接一条CQ搞定对吧?呃,这种方式可能会觉得更舒服一点,但是呢,这里边就涉及到各种各样的子查询,对吧?大家要把这个逻辑关系要梳理清楚啊,就这里边我可以把这个再稍微改一改,就是这是这个from的这张表对吧?然后这是后边这个就是最外边的这个select from的这张表对不对?这个子查询对吧?然后里边这是这个啊select这里边from的这张表,这个大家要搞清楚一点啊,大家看到了现在这个输出结果是不是跟刚才一模一样啊,也是不停在更新我们top的这个榜单对吧?啊,所以这就是关于这个纯CQ的一个实现。
我来说两句