00:00
对table apiq有了做简单速手,因为其实这部分非常容易理解,那就是只要我们能够把一个data streamam转换成table,转换成表。那接下来。直接去调用对应的API去做转换处理不就完了吗?那所以呃如特别是如果我们对于关系数据和CQ又非常熟的话,那接下来的事情其实就是直接去写CQ就可以了啊,接下来我们来看一看怎么样去在代码里边调用对应的API。首先呢,我们需要去引入相关的依赖,因为我们知道现在使用的是table API,那肯定它是有一些特别的依赖支持需要引入的。这里需要引入的是flink table API Java bridge,然后后边是SC的版本,下边给定的是当前flink的版本,跟flink的版本一致,这里引入的Java bridge呢?这是一个Java版本的桥接器。
01:04
什么叫做桥接器呢?也就是他起一个桥梁的作用,负责上边我们这里的table API和下边data stream之间的转换,也就是说。我们底层本质上来讲还是hittream去进行处理的啊,那当然了,现在我们使用的是table,那中间一定要有一个环节能够把table转换成对应的data,那这个就是通过bridge接器去进行转换的,我们需要把它引入。Link给我们提供了不同语言对应的版本,我们当前当然使用的是Java版本了,如果我们想写的是LA代码的话,也有LA版本的小节器。除了调节器之外,如果说我们还想在本地的IDE里边去运行table API的话啊,那我们知道集成开发环境里边,它跟我们的集群环境不一样,可能有一些包是缺失的,那我们还需要单独引入这样两个包,这里我们看到主要是上面这个flink table planner blink。
02:13
啊,那后边是同样skyla的版本,以及下面给定的跟flink的版本一致,那这里面引入的呢,是一个blink版的计划器,这个计划器其实非常的重要,它是整个table API的核心。主要作用就是提供整个的运行时环境,负责生成当前应用对应的执行计划。我们知道在关系数据库里边,所谓的执行计划或者叫访问计划access pass是非常重要的一环,这就相当于我们前面介绍在流处理flink的运营架构里边生成的graph或者execution graph,而这个都是息息相关的,就是根据我们当前API或者CQ的结构生成出来的这样的一个执行计划。
03:01
所以我们会想到这个per,这个净化器在这里起的作用至关重要。啊,那为什么我们说这个只有在IDE里面运行的时候需要引入呢?那就是因为flink本身它的安装包在。Library对应的那个Li目录下边,其实会自己就自带了blink版本的plan,它直接就放在里边了,所以我们在生产环境里边,集群环境里边直接提交作业的时候,其实是不需要把这个包打入进去的啊,那只有在IDE运行的时候需要单独的做一个引入,那除了它之外呢,另外还有一个就是flink streaming scale啊,那主要是因为在planner里边,Blink版本的planner里边是用到了。用到了scla去做了一些代码的实现啊,那所以我们需要把对应的skyla版的流处理相关依赖要引入进来啊,那这里面我们引入的计划器是blink版本,这就是我们所说的1.9之后合并进来的阿里内部开源出来的那个计化器版本,现在我们推荐使用的是blink版本的planner,当然也就是说如果我们想用早期的flink自带的那个老版本计化器的话,也是可以单独使用的,但是并不推荐这么做,在之后的01:14以后,默认就应该不再能使用老版本的计化器了,就直接统一集成进来的就是blink版本的计化器啊,所以我们现在的话就把这个当成默认的依赖引入就可以了。
04:36
啊,所以首先我们先把这些。需要做一个copy。先把它直接放到po文件里边,对应的依赖做一个引入。引入了相关依赖,那接下来我们就可以直接在代码里边尝试着把流转换成表,然后就可以调用table API和flinkq了啊,那接下来我们就举一个最简单的例。
05:04
我们还是基于之前的这种操作,那就是输入的还是event类型啊,Event里边主要就是有用户用户名,然后还有他访问的URL地址,另外还带了一个当前数据日志数据里边的时间戳,所以这是一个非常简单的用户点击访问的事件,如果说我们说呃,前面我们做过很多比较复杂一点的聚合统计啊,做PVUV各种计算,开窗计算,现在的话我们就。做一个简化,只要做一个基本转换就可以了,就看一看当前的table API和CQ到底怎么写啊,那最简单转换当然就是只提取它的某些字段,比方说我现在就把时间戳排除掉,我只要提取当前的user和URL,我们看一看怎么样用table API和CQ来实现。那接下来我们就需要去新建一个package。
06:02
CHAPTER11啊,在下面我们去创建一个Java,当前我们就是做了一个简单的。Simple。Table example。一个事例。那整体的架构当然跟之前还是几乎完全一样啊,那首先我们需要先把流逝处理的。Execution environment,它的环境创建出来。不是一般性,我们还是把当前的并行度全集设为一方便我们测试啊,那接下来我们还是先把数据读进来,数据的话我们干脆就直接用之前的click就好了,这个是最为简单的啊。那另外如果说我们想要基于事件时间去做考虑的话啊,那我们还是把之前的那个直接做一个copy啊,呃,这里完整的可以再敲一下啊,就water strategy for bonded out of order这里边我们可以直接给一个。
07:02
前面我们可以加上当前对应的类型,那是。后边还需要给一个with time stamp a sign啊,那接下来我们可以new一个zable time stamp sign里边我们提取时间戳的话,当然直接用的就是。element.time这都是我们之前已经做过的所有的操作啊,非常的简单。在这,我们可以直接把当前的流叫做stream。当前是一个事件流。首先这是第一步,就是还是先读取。得到data strip。那下一步当然我们就是需要把data stream转换成表了,但是呢,呃,我们直接去转换其实是不行的,因为当前的这个data stream里边并没有直接转换成表的API,所以我们其实还是要依赖于执行环境,但是我们也知道了,当前执行环境是一个流处理执行环境,它也跟表没关系啊啊,所以为了让方便我们生成表,然后进行表的转换。
08:15
本身table API里边是给我们提供了一个单独的表执行环境的,所以接下来首先我们要做的是。创建一个。B、执行环境。当然了,当前我们的表示性环境其实还是基于之前的流处理环境创建出来的,所以这相当于是一个流式的表示性环境。这个叫做stream table environment,好,那我们调用的时候呢,直接调它的静态方法create,只要把前边留处理执行环境烟传进来就可以了,然后我们可以看到它得到的就是一个stream table environment,好,那接下来我们就得到这样的一个式的表执行环境,我们可以把它叫做table env。
09:07
有了这一个执行环境。接下来那我们就可以将流。想datare。转换成表了。转换成table。这只要去调用tableablev里边的一个方法,我们可以看到from。Data stream,它最后就可以从传入一个data stream调用之后就可以得到一个table啊,所以这个方法就会一目了然的把一个流转换成表,当然我们现在就是要传入strip得到这个table,那就叫做。Even的table,好吧。这是我们得到的这样一个事件的表,那自然就会想到这个表应该长什么样呢?我应一般情况,如果说我们在MYCQ里面去声明定义一张表的话,一开始我们都得诶声明出来当前表里边的每一列的字段,它叫什么名称,是什么类型啊,所有这个表的结构STEM我们得设置好啊,但是现在我们没有做任何的设置,那到底是什么呢?
10:12
啊,这个其实没关系,我们知道当前流里边的数据类型,它是even,是一个po类,Po类里边对于每一个字段都有对应的类型和名称的定义,这不就像表的STEM那个表结构里面声明的是一样的吗?诶,所以我当前就可以直接把类的每一个属性字段拿过来,当成当前表里面列的定义。这样的话,整个这个表呃,就不用做任何的额外的声明就已经生成好了。那接下来我们就可以对这个表进行转换了。我们首先用最为熟悉最为简单的方法来尝试一下啊,就是直接写CQ。进行转换。那这种转换方式的话,呃,其实也非常简单,还是基于table,它有一个方法叫做CQ que啊,那CQ query里边传一个string类型的一个字符串,那是很明显这个字符串就是一条CQ了,就相当于我们是在这个表执行环境里边去执行一条CQ啊,然后我们会发现当前。
11:23
执行一条q query。他得到的又是一个table,又是一个表,诶,所以这其实就是针对一个表,然后进行转换,又得到了一张新的表,在table之间进行转换,这其实就是我们所说的table apili CQ啊,它都都是基于table这个数据类型。那这里面的CQ我们怎么写呢?提取两个字段的话,那我们就可以。直接select user URL。From,那from哪张表呢?这里边我们好像没有表的名称,没关系,前面这里不是已经有even table了吗?我们直接在后边加上even table就可以了。
12:05
这样就把对应的两个字段从表里边提取出来了。啊,那之后如果说我们还想看最后这张表里的数据到底是什么呢?诶直接这张表自然就想到了,这个得到的表,我应该给他一个名称。我们可以把它叫做or table。我们就会想到那一条流可以直接打印输出,那当前一个表能不能打印呢?不能打印,它只只能打印当前的这个表结构STEM啊,这个架构是可以打印的,但是他自己的数据无法打印,诶,那怎么办呢?最简单的一个看到它结果的方法就是。再把它转回去,我们可以直接调用。这个不是直接通过这个table去做调用啊,而是还是调用table env的to data stream方法,之前我们是from data stream,当然有from就有,有来就有回,所以接下来我们就是直接to data stream,再把它转换成一个。
13:08
一个一个直接把它打印出来就可以啊,那这个我们可以加一个名称叫。最后不要忘记DEXE执行起来,这样整个一段简单的CQ的测试程序我们就完成了,那接下来我们可以运行一下,看看得到效果是什么样的。我们这里会源源不断的生成数据,然后把它转换成表里的数据,诶我们看到这里就是一条一条数据都在输出,前面我们看到还有一个比较特殊的符号加I,诶这个是表示什么呢?这其实这个I是英文insert的首字母缩写,也就是表示当前插入了一条数据啊,那至于它当前的这个具体含义到底是什么,我们后边讲到动态表和持续查询的时候还会。
14:03
做详细的说明啊,那现在我们只要能看到对应的每一条数据输出,其实就达到了目的。当然了,这只是最简单的使用了当前,呃,CQ直接写了一条CQ进行了转换啊,那另外就是比方说啊,我们这里边有这个user和URL,那假如说我们想把那个time也加进来,那怎么办呢。如果我们直接这么写,可能会有问题,因为我们知道time本身是CQ里面的一个关键字,哎,那所以如果对于关键字的话,我们可以加上一个反引号。诶,这样的话就可以把对应的所有的信息都提取出来了啊,就是对于关键字而言,加上反引号,就相当于我们不把它当关键字,而是当成一个标志符,这样的话就对应的字段后面的时间戳也可以完整的提取出来。这是关于CQ的写法,那另外还有一种写法,我们比较关心的。
15:01
当然就是可以基于table API直接进行转换。基于table。直接转换。那这种方式的话很明显,那就是要在table的基础上,跟当前的这个环境就没关系了,不要基于环境去调,而是直接基于table去调一些方法,那当前这个table能调什么方法呢?我们就可以想到了,之前写CQ的时候,那是select什么样的字段,然后from这张表啊,那现在的话我们是不是就可以直接让他去select,诶确实我们看到后边是可以直接调用这样一个select方法的。Select里边传入的是一个expression,一个表达式,然后后面又会得到一个table,这当然就是基于table的转换了,那这样一个表达式又是什么样的写法呢?啊,这表达式当然我们就知道相当于类似于CQ里边的某些。
16:03
语法的表达了啊,那所以这里边在table API里边啊flink table API里边给我们的定义是每一个这里的字段要用Dollar符加一个括号,里边用字符串去表式,这样的话得到的就是一个表达式,这Dollar符就是一个表达式,我们需要把这个对应的类。也就是说table API里边的expression下面的这个要引入,我们要提取的是user这个字段。啊,那当然了,另外我们还可以提取URL这个字段,上面我们多加了一个time STEM刚好就可进行对比,那前面呃,我们这里是没有加任何的筛选条件,如果要加的话,那显然后面还应该有where什么什么样的条件啊,那通过这个where我们可以看得更加清楚。下面可以加一个where,那接下来当然就是比如说我们要当前的user等于Alice啊,那这样的话,那就应该这里直接写。
17:03
Dollar。User,诶,那后边怎么样呢?这里的所谓的表达式并不是直接写等于R,是方法调用,因为当前的table这个类它是一个抓瓦类嘛,所以我们当前的所有的表述都是嵌在抓va语法里的。都是方法的调用,所以我们就是E一口easy口,那就是判断它是否相等吧。这里我们给一个Alice。这样的话可以得到另外一个。我们把这个叫做二,上面这个叫做一吧。所以接下来我们可以同样。用类似的方法把RESULT2也转换成流,做一个打印。这是最后一步。转换成流打印输出。接下来我们再来做运行一下,看一看一和二有什么样的区别。
18:06
当然这里最大的区别肯定就是带了这个威尔条件嘛,所以我们会看到。哦,那RESULT1还是每隔一秒就有一个数据生成,而RESULT2呢,那就不一定了,我们仔细看一下的话,会发现RESULT1每一条来了之后,如果是Alice的数据的话,那对应的就会有一个RESULT2输出,如果要不是爱ice的数据的话,诶,那当前RESULT2就没有任何的输出。所以很明显,我们当前不管是直接CQ还是去调用table API进行这样的都可以得到相同的结果,实现相同的需求。当然了,通过对比我们就可以发现,如果我们之前对于这个CQ的写法非常的话,用CQ直接CQ是最为方便的,那使用table API的话,它既抓的语法嵌在一起,稍微的有点扭,还是这种链式用的方式,另外呢,还加上了所谓的这个expression,它的表达式的特别的语法定义,所以这个可能就会相对比较麻烦,用的比较少,一般情况我们后边就直接写CQ,把对应的需求实现就可以了。
我来说两句