00:00
大家,呃,大家可能会想到这里边卡夫卡这个到底支持什么样的这个模式呢?诶这里边其实我们会会看到啊,给大家稍微的去查一下这个啊卡它底层这个是一个什么样的类呢?其实就一个叫做卡夫卡table SK啊,大家看最后就是卡夫卡table s base对吧,就是这样一个类啊,那么它这是个抽象类啊,它实现的是什么呢?诶大家看它只实现了a pen stream table s肉对吧?所以它也只是实现了追加式的这个流式的输出,如果说我们这里边一定要把那个做过呃,聚合的那个结果输出的话,大家会想到这是不是肯定又会报错啊。对吧?哎,我们可以这里边给大家简单的做个测试啊,在这里边如果说最后我们输出的这个过程啊,这个还是稍微有点麻烦啊,就又得改了,对吧?这里边就不是这个field,这里边我们得要一个,呃,当前是一个CT对吧,一个count,然后这个类型,呃,我们之前那个count应该是长整形对吧,给一个big in。
01:09
然后前边这里边做了这个定义之后啊,呃,我们就得用这个aggregate result这个table了,对吧,用这个啊。呃,这样我直接把这个。Copy过来。然后把这个注掉啊,然后用这个改成aggregate result table,大家看现在我的这个定义是什么呢?哎,就是得到这个count和这个呃,ID和这个count,然后转换成这样的一张表,对吧,然后连接呃,卡夫卡,我输出到think test的这个主题,哎,看看这样处理能不能成功,哎,那我们同样还是把这个,诶把这个代码再运行一下,看看这个效果怎么样。这里边不出意外的报错了啊,我们看一下这个报的错误是什么,又是对吧?A pen stream table think requires that table has only insert changes,所以这里边卡夫卡的这个呃,输出啊,它也只支持就是一个APA追加流式的这个输出,所以说这里边如果我们有这样的需求,要写这个更新之后的数据的话,诶,这个还有问题对吧,你这里边不能直接按照这个去写入,这个是做不到。
02:22
啊,那大家可能就会有问题,那更新模式,难道说这个就只有追加,追加这种模式,实际系统里边只能用这种模式去输出吗?呃,其实当然不是了,这里边卡夫卡和文件系统都是这样的一个特点,是因为大家知道卡夫卡是消息队列嘛,对吧,你如果要他就是之前我们的那个数据都已经输出过去了,然后你又要更改之前的数据,这个这个对于卡夫卡来讲确实是难办了点,对吧?啊,那对于什么样的系统,它就比较好做这样的事情呢?啊,那当然就是类似于数据库这样的,呃,这个这个格式对吧,这样的一些输出,比方说像HP啊,比方说像这个,呃,像这个ES,大家看,接下来我们给大家说一说其他的一些呃输出系统它怎么样去定义啊,那比方说这里边我们看输出到ES怎么去定义呢?呃,这个也是非常简单,我们就把呃这个对应的那个呃依赖已经有了之后大家看啊这里边我可以直接connect到一个new,一个elastic search对吧,这也是table API已经给我们提供好的一个连接器,那么这个连接器里面描述我们传什么呢?啊,版本对吧,版本是六,然后host local host,还有端口9200HTTP啊这个协议对吧,后边哎,给一个index,然后给一个document type对吧,这都是这个常规的一些配置,只要把这些配置好,我们就建立了一个到ES的连接。
03:51
当然这里大家需要注意就是ES只提供了SK操作,你如果把这个连接,然后注册的那个表要当成输入的话,也会报错,对吧,那就只能输出,而且大家要注意啊,输出的时候我们不是想要去。
04:07
就是呃,做一个这个,呃,把那个聚合结果做一个输出嘛,啊,所以这里边ES是给我们提供这种可能的,它给我们提供了不同的输出模式,就是在ES的连接器里边,就是后边你必须要去指定一个对应的UPS mode,对吧,就是呃指定的那个,就是我们那个更新模式update mode,我们这里是指定成。UPS mode就是前面我们说的那个第三种啊,插更新插入的这个模式,他发的消息也只是每一条,每一个操作发一条消息,要不是UPS消息,要不是delete的消息,因为大家知道ES里边是可以去管理key的嘛,对吧,所以这个它完全可以给我们查到,去做一个这个模式的这个写入啊,那如果有了这个模式之后,大家再去直接,呃,大家下去之后可以测试啊,把那个AJJ呃,Result啊,那张表里边的数据往ES里边写,大家会看到这个是可以真正写入了。
05:08
啊,那这里边还有就是这个后面呃,With format啊,那大家知道这个ES我们发送的时候一般用Jason对吧?呃,用Jason作为这个呃格式化的这个格式啊,然后后边这个STEM的话,还是该是什么定义什么,比方说我这里边定义这个ID和count对吧,定义一张表,后边呢,就把AJ得到那个结果直接插入进去就可以了啊大家下来之后可以把这个连接这个ES系统啊,直接做一个测试,看看这个效果。这是这个ES这一部分啊,那对于其他的这些连接器呢,同样也是,你如果要是想要把这个经过聚合之后啊,有更新的这张表,如果要是呃做一个这个写入的话,写入到外部系统的话,就一定得它支持upset模式或者是retra模式,对吧,就在前面你用这种方式定义出来就可以了。
06:01
啊,然后另外再给大家,呃,这是ES的这个输出啊,另外再给大家讲一个比较特殊的是输出到MYSQ啊,为什么说MYSQL比较特殊呢?因为在这个官网上面,大家如果去做这个查询看的话,这个外部系统啊,会看到别的这个系统,比方说这个卡夫卡对吧,大家看这个实现有几种方式啊,写这个Java或者SKY拉代码对吧,或者有这个Python的API的这种写法,另外呢,还可以配置配置文件和写这个DDL,诶那这几种方式又是干什么呢?这个文语言里边的这个调用我们都知道了。这里边的这个DDL指的就是大家还记得之前我们看这个。Env啊呃,做这个env,我们执行那个CQ的时候,大家还记得吧,这个在table API test这里边。我们这个写了一个CQ query,大家还记得这个我直接点的时候啊,我写CQ的时候,除了CQ query还有什么吗?
07:00
还有一个CQ update对吧?哎,那这个CQ update又是又是干什么的啊,CQ update就是可以执行DDLDML对吧,可以执行我们CQ里边的那个各种各样的操作啊,啊,当然了,现在就是这个关于这个DDL执行可能支持的还有限,还不是我们所有的那个CQ操作,这里边全部支全部支持啊,但是未来的这个发展肯定就是功能会越来越齐全啊,那这里边就是对于我们当前对外部系统的这个连接方式呢,哎,除了前面讲到的这个在代码里边,哎,我去做这个。呃,这个table table API直接去点connect对吧,去做连接,然后注册表除了用这种方式之外,那还有一种方式就是我直接写一个DDL,然后直接。table.cq update对吧,执行DDL直接把它注册进来。啊,大家看这个DDL的这个,呃,创建的这个方式呢,也非常简单,就是像我们在MYSQL里边去创建表一样,Create table对吧,该怎么创建那个定义字段,怎么定义,定义好了之后加一个with。
08:12
With后边就全是我们关于这个外部系统连接的配置项,比方说卡夫卡这里对吧?呃,Connector type version对吧?呃,Topic啊,还有这个zoo keepper,还有这个BOP service,就是这些东西啊,都是同样的,所以大家只要知道的之后,呃,一试就就试出来了啊,那另外还有一个木文件,压木文件大家知道就是在就是在我们那个配置文件里面去配了,对吧?啊,那这个是配什么呢?稍微说一下啊,这个不是配我们本身的那个。不是配本身的那个flink com的那个压模文件,这里边需要去配的是这个啊呃。我们到那个工具下边,到卡夫那个link下边。然后呢,我们进入到这个comp文件下边,大家注意看,有一个叫做CQ client defaults对吧,这里面有一个这个配置文件,那那这里边到底又是什,又是些什么东西呢?大家可以进去看一下啊,它就可以给我们配置你默认要连接的。
09:15
这个表对吧,连接外部系统数据库创建的默认创建的表啊,然后当然了还有这个什么用户定义udf函数对吧,还有catalog,还有这个module啊什么的,各种各样的定义都可以在这里边做配置啊,这又是到底是个什么东西呢?这是就是当前我们这个flink的版本呢,给大家提供了一个叫做CQ client的一个客户端。就直接可以跑运行CQ的一个客户端,它的好处是什么呢?你打开之后,就像我们打开这个MYCQL客户端对吧?或者你打开这个have的客户端,直接在那敲那个CQ命令一样,你只要敲一行命令,一执行就相当于通过这个客户端提交了一个flink table API的一个,或者说flink CQ的一个一个drop对吧?一这这样的一个小的这样一个一个命令啊,那大家如果要是对这个感兴趣的话,可以下来试一试,但是现在这个呢,还不是很稳定啊,现在只是一个相当于一个测试可用的版本啊,所以说呃,有些公司可能已经在用它做开发了,呃,但是呢,可能未来变化还会很大,所以这里边只是给大家讲一句啊,呃,就是有这种简单的这种方式,那这个CQ client怎么去启呢?它其实就是在我们那个并目录下边,大家还还记得我们这里边都是那个命令,对吧,除了启动这个flink,呃,这个。
10:40
另一个提交任务啊,或者说启动集群,或者说其亚session啊,还有一个命令叫做CQ client sh对吧?啊,所以我们如果要起它的时候,就是调这个命令,直接把它起起来,它会加载我们的那个CQ client的那个配置文件,压ma文件啊,那你如果在里边按照这个压ma文件配置好了,对应的这个这个默认的这个表的话。
11:04
它启动的时候自动就给我们连接,然后再相当于在当前环境里面就创建好这样的一张表了。现在这种用法相对来讲可能还都是大家做这个演示啊,做测试,实际这个项目当中直接用它来提交可能还比较少啊,所以我们主要还是给大家讲这个代码里边的写法啊,啊,那这个为什么说MYSQL比较特别呢?就是因为大家看到啊,JDBC连接的这种方式呢。没有提供前面我们给大家讲的connect方法。因为大家知道前面我们在做那个data streamam API的时候也讲到了,没有它的连接器嘛,对吧,流处理跟它其实是不是特别匹配的,没有提供这个连接器,所以它没有代码里边直接做转换connect连接的那种方式,而是提供了什么呢?呃,Yamo配置文件以及DDL的这种方式啊,所以这里面给大家推荐的就是你要学会,比方说我要连接一个MYSQL啊,那怎么办呢?我就要学会这个在里边写DDL的这种方式。
12:04
啊,那这里边的写法也非常简单,我们就以这个,呃,就是这个PPT里边的这个事例给大家做一个讲解,啊我可以怎么写呢?呃,大家可以直接create一个table对吧?比方说我这个就叫做gd bc output table,然后呢,这里边就是定义字段对吧?一个ID一个count啊那家知道当前既然我定义了count,那就说明当前往MYCQ里边写数JDPC连接写数以啊可以有这个相当于就是说更新模式的调整了,对吧?如果要是这个字段要更新的话,也可以直接可把它写进去对吧?啊所以这个过程就是相对来讲会比较简单啊啊,那这里边就是我后边这个with啊,With什么呢?这里边给的就是type JD bc,然后URL,哎,这就是MYSQL连接,对吧?把这个写好,如果有这个默认数据库的话,你写上啊,那这里边还有table,到底是哪个table呢?我们当前这个table名字叫做sensor count,那如果你在这里定义的话,当然是在,呃。
13:05
MYQL里面也得把这个表创创建出来,这样才能写,对吧,我们当前只是个连接器嘛,并不是在那边要创建表,对吧?啊,所以后边还有这个对应的driver username password啊,这些都是常规的一些连接配置,定义好之后,后边大家看啊,执行这个in table in.q update,把这个DDL,这就是个字符串嘛,执行这个DDL就可以把这张表创建出来,注意这个创建是在我们当前的这个flink的这个表执行环境里边啊,在catalogck里边给他创建注册进来了,对吧?啊,它并不是说真的在我们那个MYSQL里边创建表。所以这里面就相当于是我们前面connect那个操作,然后创建表的那个过程,那后边真正要写数的时候怎么写呢?啊,那边MY建好表之后,我这里边呃,Ggg get result那个table对吧,直接把它insert into到这张表就完事了。
14:01
这就是这个其他系统的写入方法啊,给大家都说一说,这个方法可能特别特别多,但是大家只要就是掌握它的精髓更新模式,把这个了解清楚,然后别的我们看这个官网介绍,或者看文档怎么用怎么配就可以了。好,这部分先给大家讲到这里。
我来说两句