00:00
啊,我们现在已经知道了怎么样输出数据到文件系统到卡夫卡,我们还讲了怎么样输出到ES啊,大家发现就是说能把更新之后的这种数据啊,就是我们说的做过聚合之后的这种表,如果要想做输出的话,那必须得外部系统支持retra的模式或者支持up模式才可以,对吧?啊,所以之前我们发现文件系统和卡夫卡是不支持的,所以说没有办法写入,而ES支持是可以写入的,那这里边呢,大家自然就想到了对于这种表的操作,那怎么能忘了我们经典的关系型数据库MYCQ呢?对吧?类似于GDBC这种连接的数据库,我们怎么样把当前得到的表直接就写入到MYCQ里面去呢?哎,这个当然也是可以直接去写的啊,对于这个table API和flink CQ而言,之前我们在讲这个stream API的时候,大家发现就是呃,在当前一点十的。
01:00
之后还没有提供MYSQL的一个连接,对吧?GDBC的连接,连接器是没有没有官方直接提供的版本的,那对于这个table API而言,我们发现它天生就应该跟MYSQ这种关系数据库连接很紧密啊,啊,所以说其实还是有一个连接器的,如果说我们现在要把这个数据直接写入到MYSQL,输出到MYSQL的话,那需要大家在这个代码里边啊,引入一个啊,就是对于flink而言,它专门为table API的GDBC连接提供的一个连接器,Flink g d bc,大家要把这个引入,然后接下来我们在代码里面大家看到啊,这个连接器就比较特殊,它里边呢,并没有给我们提供那种大家常见的那个connect script的那种写法,对吧?我们看到这里边我们调用的是另外一种形式,就是直接写了一个DDL。
02:00
哎,所以大家看到对于这个MYSQL而言,也是在这个设计的过程当中,我们就想到啊,你如果要是说想要后边去写CQ的话,那可能我们最熟悉的方式是什么呢?我就连建表的时候也用这个DDL去建表不就完了吗?对吧?我也同样都是用这个CQ里边熟悉的这套方式去做,这样不就是最简单的操作吗?哎,所以对于这个MYSQL而言,大家看它的建表的方式,或者说建立连接的方式是什么呢?直接用我们的建表的d dl create table,想要创建什么表,直接就像我们在MYSQL里面建表一样,把它先创建出来,然后呢,后边要有一个with语句,这个with里边传进来的,大家看这是不是就是我们的那个连接器的各种描述啊,对吧,之前我们是有一个包装好的skyla类啊,一个Java类放在那里,然后我们去把它拗出来,然后去指定那些配置的一些连接连接项,而这里呢。
03:00
所有的东西都放在这个DDL里面直接搞定了,所以这其实也是一种常用的,对于我们这个table API和flink CQ而言啊,呃,就是定义这个外部系统连接的一种方式,那这种方式反而可能大家会觉得更好用是吧?啊,所以其实前面这个输出到ES,输出到卡夫卡也有类似这种表达的啊,只不过就是说可能如果前面我们直接用这种表达稍微有点奇怪,对吧,你写入到这个卡夫卡,然后直接用了这个DDL去去表示,但它是可以的,也是后边就是同样你建立表结构放在这儿对吧,大家看这个连那个skima和那个,呃,我们我们讲的那个格式化工具都啥都不用了啊,因为这里边其实你都定义好这个表的结构是什么了,对吧,到底怎么怎么建立,我这已经都知道了嘛,后面只要去指定我的这个连接器就可以了,连接器都是用with去表示。所以这里边你看到我们后面就传一些参数啊,当前的这个连接器的type是GBC,那当然就有可能有别的对吧?哎,有有可能有卡夫卡,有有这个ES,那然后URL呢?哎,一个JDBC的连接,MYSQL连接上,那后边指定当前的这个表是哪张表?哎,注意啊,这张表跟我们这里的这张表不是一回事,这里的这张表是什么呢?这是真正在MYSQL里边保存着的那张表,对吧?我现在真正要连接外部数据库MYSQL里边的这张表,而这里我定义的这个create table,这是什么表呢?这是在当前我的环境里边执行这个DDL要在当前flink的表执行环境创建的这张表,所以大家不要认为我这个create table是直接在MYSQL里边创建的这张表啊,不是这样的啊,就是我这张表呢,是在表执行环境里边,它对应的外部的那个MYSQL的表是这个sensor count。
04:58
啊,所以之后我如果要做这个,呃,你看你看我最后面如果要去执行它的时候,那就不能table点色into了,对吧,因为这里边我并没有table这个数据类型嘛,我直接这里是注册在当前这个表环境里边的,那怎么办呢?诶大家看我是调用环境的CQ update,当时我们不是有那个环境的CQ query嘛,去执行一条呃,CQ查询对吧,那这里边现在这不是CQ查询了,这是一条这个DD啊,所以这里边我们是直接用这个CQ update update的话可以执行in色的操作,Update操作,另外还可以直接执行DDL,所以我们直接把这个写好之后传进来就完事了啊呃,所以这个其实整个的这个过程也是非常简单的啊呃,就是这里边首先我们是要执行一下这个,呃,当前的这个,呃,DTL这个执行之后,我们相当于是在表环境里边注册这张。
05:58
表对吧?相当于只是把这张表创建出来了,那后边如果要往这张表里边写数的时候,那就得还是调用,比方说之前我们的一个a j j table对吧?哎,我得到了一个a j result,直接做一个insert into写入到这张表,注意写入哪张表呢?环境里边定义的这张表对吧?然后环境里边定义的这张表大家知道它底层转换之后是一个table sink,这个table sink连接到哪里呢?接到外部的sens count这个mysle里面的这张表,所以最终是写入到了这张表里边,你到mysle里边查这张表就可以了,大家要把这个表的关系要理顺啊,这里边主要是涉及到这个问题,所以呃,这种方式有可能以后大家会经常碰到,因为呃,就是如果以后我们风格统一都用这种方式的话,就相当于在代码里边,你就跟操作这个就就直接用CQ写DDL对吧,然后写CQ查询。
06:58
不跟直接操作,我们关系型数据库是一模一样的,全是这种表达对吧?前面建表的时候也是这样,然后后面查询的时候也是用这个CQ去查,那就所有的风格全部统一了,他对于就是DBA和传统的对CQ非常熟悉的这些数据工程师而言,是最友好的一种方式啊,当然了,呃,对于我们这个写代码的大数据工程师而言,大家可能会觉得这个connect的这种方式也比较舒服,对吧?啊,所以这两种方式,呃,底层最基本的方式是是这个connect里边给一个这个描述器,然后指定这个STEM和form啊,那这里面这种就是直接写DDL的方式也可以作为参考啊,那直接写DDL有一个缺点就是就这里面一大段一个一个string,对吧,然后到时候他报错的时候,有可能你这个信息不好去排查是吧,你看他那个报错信息的话,你得找到对应的那个位置才可以,而如果说你在这个代码里边,你集成在这个Java。
07:58
代码里面去调用这个方法的话,那最后我们其实做这个故障的排查就比较容易了,这是关于另外一种用法。
我来说两句