00:00
接下来这一部分给大家讲一讲table API和flink CQ,那table API呢,它其实是流处理和批处理通用的一个关系型API了啊,那这个在大家之前的编程经验里边,其实应该也是比较熟悉的,Spark里边这些呃,Table相关的一些API啊,包括CQ的使用啊,都是比较成熟的,那flink也其实是在一直朝着这个方向在发展,因为如果要总是用那样底层的API去调用,那处理一些比较复杂的场景可能还是比较有必要的,但如果要是处理简单的场景还那么用的话,可能就呃对这个开发人员的这个友好程度就不够,所以现在这个flink也在大力发展这部分的这个高层高级的API啊。好,那首先给大家说一下这部分需要引入的pop依赖,那其实主要就是flink table了,把对应的这个依赖引入就可以,然后接下来。
01:00
哎,我们用一个简单的例子给大家看一看table API到底怎么样来使用。我们过一下这个代码吧,首先啊,这一开始这个都不用说了啊,这个就是在获取这个流处理的执行环境,对吧,然后接下来大家看这里边创建了一个。卡夫卡的consumer对吧?呃,先拿到了一个卡夫卡的consumer,然后接下来是不是创建了一个卡夫卡的S啊,对吧,相当于这里边先创建了我们的这个,呃,流逝的这个S任务啊,先读取数据,流失数据,然后接下来这就跟我们之前熟悉的东西不一样了,大家看这个写法啊。定义了一个table env,这是一个table的执行环境,对吧?Table环境,那么怎么去定义它呢?它调用了table environment,然后点get table environment。
02:00
注意这里边要把env前面我们定义好的那个流式处理的环境,执行环境要传进去,所以大家看这个过程有点像什么呀。就有点像,就我们之前那个做SPA的时候,SPA streaming,对吧,你创建这个SPA streaming contacts的时候,是不是也得基于这个SC去创建啊,对吧,其实是大家可以,当然不是完全一样啊,大家就是可以去用这种思维去类比,所以说基于这个流处理的执行环境,在它基础上在创建这样的一个table environment。有了这个table environment之后,另外大家会想到我们就可以对那个数据做操作了,但是一开始读进来的数据可能还要做一些基本转换,那大家看这个stream.map,这就是转换成我们想要的那个数据类型对不对?大家看转换成一个e commercee log这样的一个电商日志的一个数据类型啊,然后接下来。
03:00
这个操作大家要注意一下,这是做了一个什么操作呢?大家看table env直接点from data stream,这就是从一个data stream里边直接读取出一张表来,对吧,就相当于把一个data stream转换成一张表了啊,就直接用这个这个方法直接就读出来了,所以这个就很简单嘛,你这里边定义好了这个呃,Data stream,然后这里边我直接调用这个table env的点from data stream方法,直接把它读进来就可以对吧。然后接下来有了这个table之后怎么做操作呢?哦,大家一看这个,大家看这里边得到的类型就叫table对吧。然后接下来就基于table做操作了。啊,大家一看这个操作,可能会可能会觉得非常的亲切,可能会泪流满面啊,大家看是不是接下来这就直接点select哪些字段对不对,然后filter什么等于什么where条件对吧,直接写在这里又得到了一个新的table对不对?
04:08
啊,最后如果我们想把这个新的table再转化成硫打印出来,怎么样转化呢?也非常简单,这里边再定义一个流,诶这里边怎么转化呢?table.to a pen string,然后转化成这样的一个string string类型的流,为什么是string string类型呢?哦,那就是当时的这个midd和CH这两个,这两个字段都是,这里边都是string类型,对不对,对吧,选取出来是一个元组,所以说转换过来之后就是一个二元组数据类型的一个data stream。所以大家看其实就是这么简单,最后你把它直接打印输出,不要忘记流处理执行程序的时候,最后要exe cut,对吧?呃,要把这一句要写上去,所以整个这个过程其实大家看,其实就是你知道怎么样创建环境,然后知道怎么样把一个这个流转换成表,然后怎么样把表转换成流,知道这些是不是就可以了呀,然后中间这个表的转换过程,是不是这太简单了对吧?那大家会想这里面你的字段是从哪来的呢?
05:23
样例当然是样例类里边定义好的,对吧,只要是样例类里面定好的字段,你这里边直接写直接s selectt出来就可以了,所以这个大家看是不是根本就没有必要专门去学啊对吧?看一看大概就就知道了啊啊所以这是这个就给大家先大概的了解一下table API怎么用啊,那当然了,对于flink而言,你如果就这么去用这个呃table API的话,显然是不够的啊对吧?啊,当然这里边先给大家讲一些基本的这个就是操作啊,那么大家大家看这个这里边。
06:00
数据类型是t class样例类的话,它可以直接根据它的结构生成一个动态表,对吧?啊,所以那我们生成的方式就是直接把这个流data stream传进来,我们调这个environment table environment的from data stream方法就可以生成一张表,啊,这是生成表的方法或者呢。这里大家注意一下,或者如果我要是不想直接用这里边的这个字段,那怎么怎么弄呢?我可以在后边给他单独命名,那这个命名的顺序就得按照本身样例类里边定义的顺序来,对吧,对应位置一个跟着一个啊,所以大家看可以后面跟上,哎这样比方说mid duu ID这样去给。这里大家稍微注意一下,这个有点特别啊,它标标志这个字段名的时候用的是什么方式呢?是一个单引号,然后后面跟一个这个名字,那就用这种方式表示一个字段名,这个有点奇怪啊啊,这个大家稍微熟,就是了解一下就可以知道他这么用就可以了。
07:06
呃,最后这个动态表如果要转换成流的时候,那怎么转换呢?大家看一个table,直接to a pan string对吧,是什么类型,你把它转换成这样类型的一个,呃,Data stream就可以了,这里边大家要注意一下,就是表转换成流有两种方式,有两种模式吧,一种就是所谓的apad模式。判是什么意思啊,追加对,那追加是表示什么意思呢?追加其实表示的是说如果之前的这个动态表,它只被insert这样的操作更改的时候。那么我们就可以用这个apad模式去做一个追加,也直接把它选取出来就可以了。那如果要是做了其他的一些操作呢,比方说做了这个update呢,做了删除呢,对吧?啊,做了这些操作怎么办呢?那就必须得用另外一种模式叫retract模式,那这个retra模式就是任何情况下都能用啊,那等一下大家再看看这个re retra模式是怎么用的啊好,然后我们再看一个刚才这个例子非常简单了啊,这就是最简单的一些基本操作,这个表转换成流,对吧,流流转换成表,然后表里边可以select filter啊,然后做一些基本的操作字段的选择,那大家会想到我们如果要是用flink去做这个。
08:38
呃,Table API操作的话,显然不能仅仅局限于做个map,做个做个select对吧,做个filter,这显然远远不够,那我们至少是不是还得知道这个窗口聚合怎么去做啊?好,接下来给大家讲一讲这个table API里边的窗口聚合,好,这是一个具体的例子啊,就是说我们要统计这是一个电商项目的背景呢,就是统计十秒钟,那个每十秒钟统计一次,那大家想到这是要开一个窗口了,对不对,开一个滚动窗口了,然后接下来要统计的是啊,就是渠道为appstore的个数。
09:16
好,那大家看一下这个怎么去实现呢?那首先还是创建这个执行环境对吧,然后接下来现在我们是不是要用这个时间语义是even time了,这还是一样啊,Set stream time character,时间语义even time,然后接下来大家看下面还一样这个。创建一个卡夫卡consumer,然后a source对吧?呃,建建立这样一个卡夫卡源,然后接下来stream要map成我们想要的数据类型样例类ecommerlog哦,接下来大家看哦,那当然了,这一步还是得有的,就是我们得知道既然是even time嘛,你得指定到底从哪个字段去提取时间戳对不对,另外还得指定一个延迟对吧?啊,当然这里边我们延迟直接给了零啊,这个就是就相当于就是就是就要实时对吧?啊别的别的都不要,就要实时好然后接下来大家就看这个关键了啊tableable API怎么去做操作啊,一开始大家还可以放松一点啊,还是一样的。
10:24
怎么做操作呢?还是是不是创建table environment啊,对吧,从table environment.get table environment把DV传进去,先拿到环境,然后接下来还是一样,先把data stream转换成table,对吧?那么这个table怎么样去转换呢?还是from data这里啊,大家注意对我这里就把这个所有的字段是不是都重定义了。这里大家要注意一下别的前面的这些字段还都好说啊,这个这个都没什么其他特别的,比较特别的一点是下面有一最后有一个对TS点柔态。
11:09
那么这个点row time代表什么呢?代表什么含义呢?大家想一下这个这个代表什么含义呢?这就是在指定我们当前table表里边的时间字段,就有点像我们前面指定那个在流里边指定它的时间戳一样,对吧,这里边又指定了一回,就是在这里边点real time就表示指定的那个时间字段。那大家会想到这里面为什么叫real time呢?啊,因为我们现在是even time的时间语义,在even time时间语义下面,就必须用这样一个关键字,点real time来指定当前的时间字段。那大家会想到,如果是processing time呢?呃,Processing time processing time就不用单独指定某一个字段了,呃,对,但是后边得跟一个,你自己定义出一个字段,然后点。
12:10
Pro就是processing proc对吧,它的前面的四个字母pro type得得加上那样一个关键,然后面会给大家把这些都列出来啊,这就是这个不同时间语一下指定时间字段的这样的一个方法,然后接下来我们就可以做操作了,我们做的操作是十秒一个滚动窗口,对不对?呃,然后是不是要按照那个渠道去聚合啊。聚合起来起,呃起来之后是不是统计各个聚合在这个各个渠道,在这个窗口当中出现的次数啊,呃,其实就是这样一个简单的操作,所以大家看怎么去做呢?这个table怎么去做,这里边首先要开窗口。这里面就涉及到点window操作了啊,Window操作是点window,然后里边大家看这个,这是个滚动窗口,对吧,滚动窗口怎么开的。
13:04
Tumble over1million。毫秒对吧,然后ON1撇TS,这是什么意思?这句话就句指定到底是以哪个字段作为当前的那个时间的表达,对不对啊,这里边窗口里边还是要指定这个的,必须把它来指定一下,然后后面还有STT,这是什么呢?好,给了一个别名,这是谁的别名呢?注意这是对,这是当前窗口的别名。滚动窗口的边缘。所以然后接下来大家看我把这个为什么窗口一定要有一个这个别名呢。因为对接下来大家想到我如果做聚合的话,我是直接按照按照那个CH,就channel这个渠道直接去聚合就完事了吗?不行,是不是还得按照不同的窗口把它聚合出来啊,所以大家看后面group by是不是有CH和TT啊,啊这两个是要放在这里去出现的。
14:14
然后最后我们想要的可能就是渠道的名字和渠道最后的一个技术啊,这相当于聚合了对吧,所以最后select ch ch.count所以大家看其实就是这样的一个过程。这个遇到窗口的时候稍微有点奇怪对吧?啊,但是就是说你只要熟悉它这种写法的话,呃,其实也不难,就是含义还是比较明确的,那当然如果要是这个东西已经拿到的话,已经得到这样一个result table了,那最后是不是想把它再转换成数据流啊,转换成一个data stream怎么转呢?这里大家注意,如果要是已经做了group by操作之后,那就不是简单的插入操作了。那需要,那那就不能用那个我们之前的pad mode了,那得用什么mode。
15:06
得用所谓的retract这样的一个模式,对不对,所以大家看接下来转换的时候就是,哎,我们用前面得到的这一个表,直接to retract string,然后这里边得到的结果是一个string,一个浪,是不是就是这个结果啊,对吧,这是我们得到的这个这样的一个呃,Data stream,但是大家发现最后得到的data stream其实数据类型不是。他前面还多了一个布尔类型,这又是什么玩意儿呢?哎,这个大家注意一下啊,这里边这个布尔类型的字段其实是表示什么呢?因为这里边它不是相当于可以表示,就是可以有任何的操作嘛,可以是插入对吧,可以是删除,可以是更改,对不对啊,在我们这里的这个处理过程当中,更改它其实是什么样的操作呢。
16:02
是把之前的旧数据标记为删除,然后再插入一条新技新数据对吧,那所以这里边的这个布尔类型其实就能表示什么。如果它是true的话,就表示我们这是新插入的新数据,如果要是false的话,就表示这是被删除的老数据,对吧?啊,所以一般情况我们这个完了之后就需要再做一次筛选对不对。筛选什么呢?是不是要筛选它是处的呀?啊,所以大家看接下来做一个filter对吧?啊,用这个第一个字段去做一个filter,然后最后print出来就可以了啊,这就是我们在这个table API table API里边去做这个开窗和聚合操作的一些过程,好呃,那这里面有几个要点还是给大家稍微注意一下啊,就是首先是这个group by。
17:03
呃,这是前面给大家强调的啊,就是如果使用了group by的话,Table转换为流的时候,那就不能to a pen stream,呃,那个。Stream了,就必须要to rerect stream对吧?啊,就必须要用这种方式,然后另外呢,调用了这种方式之后,他得到的这个数据结构的结果,它是一个二元组,第一个字段是一个布尔类型的标志,如果是true的话,表示它是最新的数据,如果是false就表示是过期的老数据,对吧?啊,这是它的含义,所以一般情况我们做完这个操作之后,要跟一个filter把最新的数据筛出来啊,这是常常规的一些操作啊。然后最后还有就是说,在这个窗口操作当中,大家要注意是不是在这里边必须要有一个on,呃,对应的这个时间字段的这个表达,对吧?柏须要指定这个时间字段基于什么去做这样的一个时间窗口,另外我们这里定义了一个别名,这个别名是不是必须要出现在group by当中啊,对吧,这个东西是必须要出现的一个过程啊,所以这个大家要要考虑一下。
18:18
另外就是关于这个时间窗口的一些一些东西啊,就是说怎么样去指定这个时间字段呢。大家注意,如果要用到了这个时间窗口,要开窗的话,就必须要声明,提前声明,在表里边提前声明时间字段,如果是processing time的话,那么创建的时候直接追加一个字段,你自己给一个名字就可以,然后后边要点pro就用这个关键字,这就表示这是processing time语义下的。我们的时间字段。那假如说是英time的话,那是不是就必须从对应的那个字段里面去抽取了呀,啊就必须匹配上对吧?所以这里面必须是TS跟前面的那个字段要匹配上,后边是点生态,用这个来定义这个even time的字段,时间字段。
19:13
啊,当然了,这个还有就是滚动窗口,滚动窗口怎么表示呢?那就是它tumble over1万点,呃,这个毫秒对吧,然后on我们的这个时间字段,这样就可以表示一个滚动窗口,就是可能大家就是整个这个操作还是比较熟悉的,但是就是说这个具体来讲的话,又有一些小细节,又有点诡异,其实这个也没有太难的,就是大家只要熟悉一下,多看一看它怎么用就可以了。最后还有一个是这个flink circle flink c呢,这一部分大家其实看一下跟table API的调用几乎完全一样。啊,那大家会看到就是就是到哪一步不一样呢?大家看还是这个例子啊,我们创建这个table它的环境对吧,然后只这个转化成table,而且指定了这个时间字段,接下来大家看上面这一步,我们是用这个table API去做的操作,如果用C去做操作的话。
20:13
怎么写呢?直接调table env.CQ query,然后里边写什么,直接给一个字符串对不对,这个字符串里面大家看这这就更熟悉了,对吧,大家看select count ch from from哪个表啊,这个表名我们得放在外面去加,对吧?然后把它加进去,然后再加。From这个表怎么样呢?Group by ch和。和时间窗滚动时间窗口对不对?所以大家看这里边就是把我们原先的那个呃,另外那个字段TT是不是换成了一个时间窗口的表达,滚动窗口的表达怎么表达的呢?是tumble,然后括号里边有它的参数,它的参数是TS,呃,TS指定当前的时间字段,然后后边是它的那个间隔对吧?INTERVAL10SECOND。
21:14
哎,这就是这个flink CQ的一个表达,别的又完全一样了,对不对,这个得到的table是不是跟之前完全一样啊,然后下面还是totra stream把它转换就完了,然后再根据这个它那个第一个字段布尔类型是不是处把它筛选出来,哎,这就是这个table API和flink CQ的一些用法,好,呃,就给大家讲到这里啊。
我来说两句