00:00
那我们现在已经知道怎么样从呃外部系统里边去读取数据了,其实就是用这个的方法啊,那其实我们也可以就是融会贯通啊,举一反三,大家可以想到,如果要是连接,比方说像我们之前的ES对吧,连接这个MYSQ其实也也是类似的一种方式,当然就看呃flink到底给给我们有没有提供对应的连接器对吧?如果有那个我们说那个连接器描述器的话,那对应的就可以做,那如果要是没有的情况下怎么办呢?诶后面我们讲这个就是往外部系统写入啊,到时候给可以给大家提一句,那个里边呢,又会有一些其他的方法,那接下来我们先来看下一步操作,那就是。接下来我们要做查询转换了transform这一步,对吧?啊,那我们前面已经讲过table API它其实是集成在就是skyla和Java语言里边内嵌着的这样一套查询API,它基于的呢,就是代表表的这个table类,对吧?所以大家要注意就是它跟我们前面讲到的那个注册好的表还略有不同,它是相当于我们在这个Java和skyla语言里边定义出来的类的对象,对吧?啊所以接下来呢,我们首先要从你要想操作table API,那首先得得到table这样的一个对象,这样类型的一个对象,所以说我需要怎么样先从利用这个tableable env表执行环境调它的from方法,从已经注册好的这个catalog啊这个环境里边去拿到一张表的实这个对象,对吧,把它转换成一个table的实力啊,那接下来呢,基于这个table你就可以。
01:42
调各种各样的转换操作了啊,Select filter前面我们也讲过对吧?啊,可以做各种方法调用,形成链式的调用结构,这是一种风格,就是table API的写法,那与之对应的呢,另外还有一种写法就是直接写CQ对吧?哎,我们说这个flink的CQ集成,那其实它是基于呃,阿帕奇的t set,它实现了CQL标准,所以说我们可以直接定义一个string这个字符串,就表示一个直接就是写一条这个常规的CQ语句就可以了,然后呢,要去查询的时候,调用tableable en nv的CQ query方式啊,那当然了,呃,大家又会想到还有后前面我们那个看API的时候看到还有SQL update对吧,还有其他的一些方法,所以就是如果说我们不是查询操作,是一些其他的,比方说插入啊,其他的一些这个呃,DDL包括啊,呃,要要去执行一个DDL的时候,都有对应的基于环境的一些执行的方式啊,所以说。
02:42
我们比较常用的这个表的转换当然就是一个查询了,所以大家看到得到的也是一个新的table,对吧?啊在这里边比较方便的一点就是只要前面我们注册好的table在这里就可以直接用,不需要再去转换了,对吧?因为这个CQ本身执行的时候就是在环境里边执行的嘛,它能够找到当前注册好的表,这是关于查询转换的操作,那接下来我们在代码里边还是把这一部分来做一个练习。
03:15
同样还是在当前的这个代码里边,我们继续往后写吧,呃,这里是前面是讲了这个读取数据,接下来第三部分查询换,那查询转换的话,首先3.1,这里我们是用就是table API使用API啊,那这里边其实有不同的操作啊,首先这里面给大家写一个就还是跟我们之前那个一样啊,我们定义一个叫做result table对吧,就基于之前我们这里边呃定义出来的,比方说这个input table吧,卡夫卡那边我们就不再去弄它了啊,都是一样的,那这里边我们就基于之前的input table,那那要操作它之前,我们知道必须先得做个转换,对吧,因为现在还没有table嘛,所以我先定义一个input table啊,那有同学说这个你必须得叫一样的名嘛,啊,这没准对吧,这是我。
04:16
我们在scla代码里边的一个变量名,你叫它什么都可以,诶,我们叫一个sensor table,这也是完全没问题的啊,就像PPT里边写的sens table一样啊,那我可以从当前env里边from from方法里边直接传一个当前注册好的input table放在这儿,然后接下来这就是一个table了,对吧,我们就基于这个table去调table API,比方说select啊,ID temperature,对吧,这是完全可以去啊做这样的一个方式的,那之前我们的调用方式大家看这里边有两种传参方式,一种是传一个string,那大家知道就前面我们直接ID temperature这么写对吧,Temperature,那其实还有另外一种方式是传什么呢?传一个表达式,那这个表达式怎么写呢?哎,这里边呢,呃,在这个弗link的table API里边,它有利用了这个scla语言里边本身。
05:16
的一个symbol的形式,哎,大家之前应该也接触过对吧,就是前面有一个有一个单引号作为前前缀,然后呢,后边直接加一个字段名称啊,这表示一个symbol类型,对于这个table API而言呢,它会把它解析成当前我们表里边的某一个字段啊,所以用这种方式,大家看它是用到了一些影视转换的对吧,下面有一个下划线啊,所以我如果用到了这样的一个特性的话,它的写起来之后表达可能会更加的就是可读性更强一点,对吧,我一看这就是select这两个字段table temperature,然后同样后边,哎,如果说我要去filter当前啊,等这个3ID等于三四十一的啊,那同样也可以ID,哎,注意这里边就不能用这个直接用这个等号了,因为直接用等号的话,你这相当于要判断一个什么,判断一个scla的symbol去等于一个string对吧,这个。
06:16
这显然是不对的,所以说这里边呢,它要比较值的这个方法是一个三等号啊,你这里边如果掉进去的话,就会发现这是它影式的这个表达式操作符里边的一个一个方法,三等号表示我们判断当前的这个字段里边的值是不是等于当前的这个字符串啊,所以这个大家就是熟悉一下就好啊。呃,当前必须要给大家说一下的是,呃,01:11里边啊,1.1.9,一点十都是沿用了这样的方法,这也是之前版本里边,呃,就是老版本里边也都是有这样的一个用法的,但是从一点十一开始呢,呃,官网上推荐的这个写法是用一个Dollar符,然后把我们当前的这个字段名这个引起来这样的一个方式去表示啊,但是就是在之前的版本里边是不支持这种方式的,对吧,这这就看他的这个表达习惯了,大家稍微了解一下就好啊,这个其实并也并不是特别重要,就看他自己本身怎么定。
07:16
同意嘛,可能这就是因为我们前面用这个scla里边的这种symbol的形式,觉得有点歧义啊,所以就做了一个调整,其实你如果熟悉的话,这种写法也挺简单的,那这是直接基于这个table API,我们做了这样的一个转换操作,对吧?然后接下来我们再给大家写一个的实现,那么CQ的实现的话,我这里面定义一个还是table对吧?啊,那这个其实大家都很熟悉了啊,直接就是table,因为去调用它的CQ query方法里边就可以直接写一个,写一个这个,呃,各种各样的一个一个一个CQL语句,查询语句,对吧,你想写什么都可以,那这里边我们之前是直接一顺着往后写了,那再给大家推荐一个更加常见的写法,就是用什么呢?用这个,呃,模板字符串对吧?啊,用这里边可以去除掉它前后的这个。
08:16
呃,我我我我们前后留下的那个空行,我们用多行的这个字符串,直接把它做一个做一个呃定义,这样的话你就可以换行写了嘛,Select,比方说ID temperature temperature,对吧?啊,然后后边我们就可以from,呃,From这个input table,这这是已经注册好的,然后后边where ID等于三一对吧?哎,这里边我们最好还是把它写成单引号,因为就是在这个引号里边,对吧?啊,这个把这个写好了之后,这就可以做这个查询操作了啊这个得到的结果这又是一个table,那后面如果我们想要做这样的处理的话,那就再来一个result。
09:10
呃,Table to append STEM对吧?哎,这后面就都一样,我们直接把这个啊,当然这这里边得到的就不是三元组了,原始是三元组,现在少了一个,那就是二元组嘛,String和double就是这样了啊,那包括上面这个input table,我们也不用再去做转换了啊,我们直接就用这个result table直接给大家测一下就可以了,对吧?这里边result table to append stream后边同样还是一个二元组的输出,这里边这个是result,下边这个是cql result对吧?啊,这个我们知道就好啊,上面这个就不出了,我们来一下看看这个效果。已经运行完毕,看一下结果啊,大家看到不出意外肯定还是这个CQ和这个result都是一个二元组的输出对吧?啊,当然这里边输出的这个先后顺序可能会有一些一些变化啊,那这里边主要就是说本身这就是我们说的啊,如果说是这里边做了这个不同的转换操作的话,我基于同一个表做了不同的转换操作,然后再呃,后面再把它这个转成留在输出这个到后边就就不一定谁先谁后了,对吧?但至少我们可以看到,因为当前的那个并行度是一嘛,上面我们是定义了这个全局的,这个并行度是一的,所以CQ里边它处理的时候,这里的数据都是按照我们本身定义好的这个,呃,就就是当前的这个顺序来一个一个输出的,对吧,啊,所以这个还是没有什么问题的,好呃,那接下来我们就给大家把这一部分,大家可以下来之后再把这个。
10:57
一部分好好练习一下啊,这就是查询转换操作就讲解完毕了。
我来说两句