00:00
我们已经对flink的table API和CQ有了一个初步的了解,那接下来呢,还是先用一个简单的案例做一个快速上手,看看这个代码到底应该怎么写啊。其实如果我们要是对关系数据库和CQ的用法非常熟悉的话,那这个使用的过程啊,其实是非常简单的啊,整体来讲,如果要使用table API和CQ的话,首先我们就是先把当前的数据流data stream先转换成一张表,一个table,然后呢,呃,接下来就直接针对这张表去调用table API,或者直接写CQ对它进行查询转换就可以了。接下来呢,我们就来用一个案例具体的说一说怎么去写,那想要在代码当中去使用table API和CQ的话,首先得引入相关的依赖啊,我们可以看到啊。使用这个API的时候,要引入的依赖主要就是这里的think table API scale bridge,然后后边跟着当前scla的版本,那下边对应的这个版本呢,是flink,跟flink版本一致01:13点零,那这里我们引入的呢,主要就是一个所谓的bridge啊,这是skyla语言的桥接器,哎,它主要用来干什么呢?呃,因为我们说当前的table API和link CQ啊,是属于上层的API,它主要就是用来。
01:23
连接table API和下层的data streamam API啊,那起到一个桥梁的作用,那按照不同的语言呢,就分为Java版本和SC版本,我们这里用的是的版本,如果我们想要使用Java去进行编程的话,那引入就是flink table API Java brief就可以了。这个是需要引入的,那如果说我们接下来还是在idea里边去进行一个代码开发的话,那就是不是直接提交到集群上面,而是在本地的集成开发环境里面去做开发的话,那还需要引入另外一个依赖,就是这里所谓的flink table planner blink,然后后面同样跟着skyla对应的版本,我们这里是2.12,然后下面跟link的版本一致,同样也是01:13点零,这里添加的呢是一个planner,也就是所谓的计化器。
02:16
这个计划器其实是table API和CQ里边的核心组件,主要就是负责提供一个运行式环境,并且用来做程序执行计划的生成啊,我们知道在CQ里边啊,都有所谓的执行计划,它可以简单的认为就是类似于我们前面介绍过的执行图,作业图这样一类的东西啊,基于我们的代码去生成的这个执行的一个计划,我们这里用到的呢,是新版的blink版本啊,这里所谓的blink版本就是阿里在1.9版本之后啊,开源贡献出来的自己内部的那个flink table API的计划器版本啊,现在默认情况下已经推荐大家使用的就是blink版本了啊,那如果说想要使用老版本的话,那也可以单独的进行引入。
03:07
这里需要多说一句的就是,我们只是在本地的集成开发环境里边想要去运行table API的话,才需要引入这个plan,如果是直接把作业提交到集群环境、生产环境里边去执行的话,其实是不需要单独打包这个依赖的。因为。本身flink默认的安装包里边在library啊,也就是lib lib这个目录下边就已经包含了blink的计划器,Planner已经在里面了,所以我们没有必要单独去引入。另外呢,如果说我们还想去实现一些自定义的数据格式来做序列化,哎,那还可以引入下面这个依赖,那就是flink table common啊,就是通用的一些支持,那同样对应的版本是flink的01:13点。啊,所以现在我们是在idea里边去执行的话,啊,那当然需要去引入的就是。
04:01
一个调节器,Skyla版本的bridge,还有一个blink版本的planner,一个计划器啊,所以接下来我们在代码当中找到泡文件,把对应的依赖先做一个引入。引入之后我们可以在没里边刷新一下,确认一下已经引入了skyla版本的调节器和blink版本的计划器,好然后接下来呢,我们就可以去新建一个测试的object,去做一个视力代码了,做一个快速上手,现在我们是新的一张,所以去new一个package。现在是第11章CHAPTER11。接下来我们新建一个SC的object,同样还是这是table API和CQ的一个简单的测试案例啊,所以我们就叫做simple table example吧。先把它创建出来,没方法先写出来,一开始的过程啊还是一样,我们还是先把流逝的执行环境先创建出来。STEM execution environment get,当前的执行环境还是叫做env,同样啊,我们把上面的这个下划线先做一个引入。
05:07
不是一般性,我们还是先把这个并行度全局的设成一,方便后边测试输出,然后接下来我们要做的事情呢,是要调用table API去进行当前数据的转换处理啊,那在这个过程当中,首先我们应该要把这个流转换成table啊,一个data stream转换成table,那这个过程呢,首先我们应该得有流啊,所以我们还是先读取数据源吧。读取数据源。创建。Data strip。所以这里边我们可以把当前的这个data STEM啊,就叫做,呃,我们应用的这个案例还是使用之前电商场景所有用户的那个点击数据啊,我们还是用之前定义的那个event事件,所以这里边我们直接定义的流可以叫做stream。基于烟V去做一个啊,我们可以直接from elements去定一些测试数据啊,这里当然就是可以去直接new一些相关的event对应的对象了,这里我们引入的是CHAPTER05第五章里边定义出来的样例类啊,Event类型啊,那里边我们可以针对当前,比方说有一个爱丽丝用户。
06:16
然后点击了一个页面,首先点击了home页面。是第一秒钟点击的啊,那后面我们还可以跟进很多很多很多的测试数据,这个我们就不再详细去敲了,我可以直接引入文档当中给定的这些测试数据,把这些copy过来。好,我们先把它放在这里,读取数据源之后,接下来我们要做的那就是需要把它转换成一个表了,哎,那这个转换成表怎么去转换呢?难道说这个data stream它有一个方法直接就可以to table吗?那之前我们也看过data stream里边可以调用的方法啊,没有这样一个方法,那怎么办呢?诶,这就需要我们创建一个当前可以调用table API的执行环境,这个环境呢,叫做表环境。所以接下来我们是。
07:06
创建表环境。那这个表环境啊,当然我们可以理解成就是在之前的流式执行环境STEM execution environment上边又做了一层包装,在它的基础上又创建了一个表对应的执行环境,所以这里边我们去调用的呢,哎,是stream,要去创建一个流式的表执行环境,哎,那需要去调用的是stream table environment,我们看到这样一个对应的对象。这就是调节器里边给我们提供的,然后呢,调用它的create方法,Create的方法我们看到里边可以传一个参数,传一个流失的执行环境进来,哎,那我们就把之前的烟微直接放进来就可以了。得到的这样的一个环境,我们可以把它叫做table in。表执行环境。然后接下来基于这样一个环境就可以。
08:01
将data stream。也就是数据流转换。成表也就是转换成一个table,哎,那所以接下来我们得到的呢,这个就不应该叫。了,而是应该叫table。我们可以看一下怎么样去做这样一个转换,基于当前的表环境去调一个from data啊,很明显就是从一个流,然后转换成一个表啊,那这里面要传入的当然就是前面我们的数据流了,放啊,我们这里可以简单的看一下啊。做一个对应类型的自动判断,得到的这个类型就是所谓的table类型,这就是底层给我们实现的一个接口啊,那当然了,我们得到的就是对应的一个对象啊,那接下来我们所调用的所有的转换计算呢,其实就是如果是table API的话,那就是基于这样一个table的对象去做各种各样的方法调用就可以了啊,所以这个我们知道得到的类型是什么,所以接下来我们要调的啊,整体就叫做table API。
09:07
所以下一步操作我们就是可以直接调用。Table API。进行转换计算。啊,那就是基于even table,我们看接下来就可以调各种各样的转换操作了,哎,可以map,还可以aggregate做聚合,哎,当然了,如果说我们现在啊,要类似于写一个CQ那样的表达的话,比如说我们就来一个最简单的吧,我们就select好,然后提取哪些字段呢?啊,Select写一句这样的CQ啊查询去提取当前的用户名和URL这两个字段,然后呢,还要后面比方说跟一个判断条件啊,要求就筛选这个user等于Alice的所有的数据。啊,那这个其实非常简单啊,如果我们写一条C,那就是select user URL这两个字段,From之前的这个table table,然后接下来where user等于Alice,哎,那如果直接写C就是这样,那如果现在我们要做这个table API的调用方法调用了,那怎么办呢?呃,其实是可以直接调一个select方法。
10:17
我们看select里边要传入的呢,可以是字段名称啊,但是这种方式呢,现在不推荐,更加推荐的是传入一个表达式expression啊,这样一个表达式又应该怎么写呢?啊,在table API里边给我们做了一个语法糖的包装啊,就是一个Dollar符啊,当然了这个Dollar符,这个Dollar啊,需要去做一个import,我们需要import flink table API下边的expressions.dollar把这个做一个引入,然后里边呢,哎,就是对应的字段属性名称了,我们现在需要的是user。后边还可以加入多个参数,因为我们看到当前这个是可变参数啊,后面加了星号,所以URL也可以放在这啊,当然了,我们也可以把user放在后边,URL放在前面,换一下这个顺序完全是可以的,这是做了一个字段的投影映射啊啊,那另外呢,我们说还需要有一个条件的筛选啊,就是where什么啊,那至于from哪张表的话,我们直接调的就是这张表的方法,那from就不用去指定了啊,那接下来还得有一个where,我们看能不能调呢?哎,没有问题,直接可以来一个where里边要传入的,哎,我们看还是一个expression。
11:30
那这个expression的话,同样就是一个Dollar,我们现在要写的就是。User,哎,那它等于什么?注意啊,这里边既然是table API,所以都是对象的一个方法调用,这里边我们所有的表达式也都写成方法调用的形式,哎,那这里边user等于Alice,这个怎么表示呢?那就调一个is equal方法啊,然后里边传入。Alice这样一个字段就可以了,哎,那所以这个跟我们直接写一个C,其实本质上是一样的啊,那当然得到这个结果呢,我们可以把它叫做result table,经过各种各样的转换,我们看啊,基于一个table去调用这里的各种方法,最后得到的,哎,当然还是table啊,啊,我们也可以,你看也可以join distinct,应该还可以filter啊,所有的这些方法调用之后得到的还是table,就是基于table去做各种转换。
12:24
好,那最后得到的这个result table,我们当然还希望把它做一个打印输出了。打印输出,哎,那我们就想了,那之前的那个this stream可以直接调一个print方法,现在有吗?现在没有这样的直接打印输出的方法,只能print scheme schemer,我们知道其实是它的表结构嘛,啊,那其实不是数据的打印,那这怎么办呢?啊,有一个非常简单的方式,那就是我们不要直接去打印这张表,而是再把它转换成转换回data,再转换成流,然后打印输出不就完了吗?哎,所以接下来我们要做的是。
13:02
转换成流打印输出,哎,那这里就需要调用另外一个方法,之前我们把一个data stream,把一个流转换成表,调的是table env的from date three,那接下来又要把表转换成硫,那该调用什么呢?哎,同样还是table en的,接下来是two date three,好,再转换回去啊,一正一反,一个from,一个to,那里边要传的当然就是result table。就是一个table,接下来呢,直接做一个print打印就可以了。所以整个的流程啊,我们看到还是一个流处理的过程,只不过呢,就中间把这个数据流转换成了table,然后再基于这个table调一些它对应的API就可以了,那这些API我们可能就更加的熟悉一点,就像我们做CQ操作里面的那些API一样啊,那最后不要忘记啊,因为是流式执行环境嘛,我们还需要有一个env。Executive执行起来好,接下来我们可以运行一下。
14:02
看看这段代码输出的结果,是不是真的可以筛选出所有数据里边爱丽丝对应的那些访问数据。我们可以看一下。好,我们看到现在输出的果然就只有爱丽丝对应的访问数据,而且输的呢,URL在前面,后边跟着的才是user,有三条爱丽丝的访问事件,所以这里边只输出三个结果。这里面我们看到一个比较奇怪的东西啊,就是在当前的这个数据,一条数据前边还有一个小标啊,那这个小标呢,是一个加号,然后加了一个I,这是什么意思呢。这个I其实是一个缩写,它是。插入那个单词insert的首字母。所以也就是表示我们当前的这条数据呢,是新插入的一条结果数据啊,这样的话,前面加一个加I表示当前它的类型。
15:00
所以整体来看的话,这个过程确实还是非常简单的啊,Table API就是这样调用啊,只要得到了基于一个理由得到了一个table,接下来调这个API就可以,那当然了,这个过程我们看到啊,还是有点尴尬,因为table API呢,看起来它还是内嵌在编程语言里边啊,不管是Java还是scla,内嵌在里边做了一些方法调用,这个就怪怪的啊,有点不伦不类,我们自然就想到了,那能不能直接。写我们熟悉的CQ去做一个查询转换呢,啊,那这种查询转换当然也是可以的,我们可以把它叫做result CQ table。那这样的一个查询转换就不再是上面基于table的一些方法调用了,而是直接去执行一条CQ,这个CQ直接就是一个string了,直接写一条string,就是我们要执行的东西啊,所以这里边我们调的呢,就变成了table en nv表环境的另外一个方法叫做CQ query,我们看到里边要传入的参数就是一个string,就是我们要写的要执行的C,那我们现在要写什么呢?哎,当然就是select,我们现在比方说啊,要的是URL。
16:12
User。呃,From from哪张表呢?前面我们的even table,哎,这里我们要注意啊,Even table其实这里是一个SC里边的table对象。这个对象跟我们这里所直接使用的表还不不太一样啊,所以这里边不能直接写这个table啊,哎,那怎么样去做这样的一个转换呢?啊,其实也简单。直接在后边加上当前的table做一个拼接就可以了,相当于对它做了一个字符串的转化,哎,在这个过程当中呢,就相当于会注册一张跟这个变量名同名的表啊,那有了这个之后,后边我们再补一个。Where,条件啊,比方说我们现在威尔换一个吧,不要是爱丽丝了。我们判断这个user等于。
17:04
Bob吧,把Bob对应的数据也筛选出来。执行完毕之后,诶,同样得到的还是一个table啊,那最后呢,我们要做的操作就跟之前这个是完全类似,我们可以复制一下,然后这里打印result c table啊,那为了区别的话,我们前面这个叫做一,后边这个叫做二,然后接下来执行一下,看看二这个流啊,转换成的data stream打印出来的是不是只有Bob对应的访问数据。哎,我们可以看到啊,果然跟我们之前直接调用table API。输出的结果是非常的类似的啊,可以说是一样啊,只不过就是一这里输出的都是爱丽丝的访问事件,而二呢,输出的都是Bob的访问事件啊,所以本质上来讲,我们看到调用table API和直接写CQ得到的结果啊,起到的效果是完全一样的,那在实际工作当中呢,很显然KAPI就会显得比较尴尬一点啊,那我们更熟悉的就是直接写CQ了,所以在后边我们的讲解过程当中,一般我们就是直接写CQ去实现对应的功能就可以了,那table API呢,只做一个辅助的讲解。
我来说两句