00:00
现在已经了解了table API里边的更新模式,那其实呃,大家就想到了,那之前我们那个例子啊,一直想要输出一个聚合之后的表,一直没输出成功,那在什么场景下就可以输出成功呢?哎,其实这里面就给大家后面就列出了这个可以应用的场景啊,大家想到是不是就是只要能支持比方说retract模式,或者说这个UPS模式的这个场景,他就可以应该能够更新成功啊,诶,所以这个其实是直观就能想到的啊,比如说前面我们直接输出到这个控制台,这也算一种方式,只不过大家会看到这个控制台输出的话,它是。他没有真正的去做更新。我们是,呃,就是转换成流之后打印输出嘛,它就相当于只是给给大家做一个直观的显示,告诉你哪条数据要撤回了,然后哪条数据要更新了,对吧,让你自己去做判断,那我们自然想到你在那个文件和这个卡夫卡里边,你不能说是这个,你自己去看吧,对吧?啊,你不能这么简单,肯定是要把那条数据更新掉才行啊,他就做不到了,所以文件系统和卡夫卡的这个处理过程里边,它实现的当前的这个接口是不是就只有。
01:18
当前就只有一个a pen stream table s对吧?哎,那这个当然就是搞不定了。啊,那什么地方它可以实现这样的一个一个更新插入呢,这样的一个更新操作呢,诶那比方说ES就可以啊,比方说这里面我们给看看搜一下啊elastic search。Elastic search。Think,哎,大家看有这个elastic search think base对吧,那我们这里边其实要创建的这些东西,其实这个大家看啊,是这个elastic search absur think风呃S呃,Absur table think base对吧,我们要的是table think嘛,这是这个ES的连接器里边为我们的table API提供的一个所谓的这个table s。
02:09
再家看他继承的,通过这个名你就能想到了,它实现的啊,不是继承的实现的接口就是absur stream table s对吧?哎,所以在这个模式下,是不是我们直接去做更新操作就肯定没毛病啊,啊所以大家可以下来之后可以测试一下啊,我们的代码里边的话。呃,我就不给大家再实际做这个测试了,但是我们可以看到代码的这个书写也是差不多的,我们就直接table connect连接,然后连接的时候里边那个描述器用什么呢?哎,就用一个elastic search,那有同学可能想,那这个我需要引入什么东西呢?不用引入什么东西啊,大家看这个elastic search是不是在flink table scripts里边也有这个东西啊,对吧?呃,就在这里边,这本身也是当前大家可以看一眼啊,这就是当前我们之前做那个table stream API的时候引入过ES的flink连接器,对吧?那么在连接器里边是不是就包含了对于table API的这个描述器的支持啊啊,所以你直接引入这个就完事了啊,它就叫elastic search。
03:20
那么接下来啊,要需要做一些基本的配置,Version是六,然后host是local host9200HTP啊对吧,发送这个HTTP请求,接下来index叫做sensor,然后给一个document type叫叫temp,这个叫什么都可以啊,然后下边大家注意,除了这个with format with stemma,指定当前的这个格式之外啊,大家看这里边我用的是这个Jason对吧,指定是Jason格式啊,然后下边是用了这个sIgMa,我这里边是一个ID和count啊,我这里边是只有ID和count,对于我们前边的这一个。这个AG table而言,是不是下面应该还得有一个。这个average temp呀,还得有一个这个平均温度对吧,得有三个字段啊,你的STEM需要定义的要完整一些,那中间呢,大家不要忘记,这里边必须得有一个in ABS mode,大家想一下,这又是要干什么呢?这要指明什么呢?
04:18
啊,其实也非常简单,就是当前如果说啊,我这里边直接去去看到了这个elastic search table elastic search。Table SK啊,Absurd table SK base啊,它里边如果说我们直接用它,然后去创建对应的这些啊,这个format和stemma的时候,它里边默认用的这个模式是什么呢?它其实默认用的是是APA模式啊,因为大家想这个当前它是up upsur的这个,呃,Streamam table s,它能够往往后边直接去做这个追加吗?只用这个追加模式可以吗?当然可以,对不对?大家看这里边有一个布尔类型的值,叫做is pen only对吧?
05:08
呃,就是所所以默认情况下,我们这里边其实就是说只接收这个插入的数据,对吧,但是呢,它是不是可以相当于支持我们接收这个更新数据啊,你可以做这个UPS的数据的一个更新啊,所以呃,这个其实大家自然知道,就是因UPS mode,这相当于就是把这个布尔类型的值变成一个false就完了,对吧?啊,这个其实还是比较简单的一个直观的一个用法,那只要把这个一个改过来,接下来我们还是create一个table,把a j j table就可以直接插入进去了啊,大家就可以看到ES里面的输出了。这个可以下来之后大家再做一个尝试啊呃,那当然除了ES之外别的,哎,比方说像这个。大家想想MYSQ是不是也可以直接这么去做更新操作啊,因为我们自然想到了表嘛,表的这个写入跟MYSQL这才是天生一对,直接就是匹配起来的,对吧?我们说流式处理里边弗林跟卡夫卡这个是天生一对,那你说如果要做表的处理的话,MYQL里边也都是表啊,那你这边定义好了之后,把表直接写入不就完了吗?所以跟MYSQL的连接其实在table API里边做的是非常好的。
06:22
啊,就是大家甚至会发现,就是之前啊,在01:11之前一一点十的时候,我们现在在data three API里边没有直接提供MYSQ的连接器,但是呢,提供了。这里边给我们提供了table API里边提供了MYSQ的连接器啊,所以这里边如果大家想要去呃做这个输出到MYSQL的这个操作的话,那需要先把当前的这个就是MYSQL对应的这个连接器要引入啊,这个就叫做flink JD bc,然后2.12对吧,大家看这是官方连接器啊,直接就是阿帕奇flink给我们提供的。
07:02
啊,它这里边只有table的支持,就在一点十之前,只有table的支持,没有data的支持。所以这里边大家就看到它有些场景确实应用不一样,呃,那这里边给大家提供的这种使用方法呢,跟前面又有所不同,因为大家想MYSQL里边是不是就纯粹就写CQ就搞定了呀。看起来好像就跟之前的这种使用方式,我又可以就是跟这个CQ啊,直接的这个连接更加紧密一些,大家看这里面的这种方式就是。没有用这个table in a的connect方法,然后去创建一张表,这种方法大家看有点像是Java代码的这种方法调用,对吧,就感觉好像更近似于table API的那种调用方式,尽管这还没有这不是table API,因为还没有table嘛,对吧?啊,但是大家看到这个更接近于这个代码调用方式,那我们这里的这种写法呢,直接就是DDL。大家看到了是吧?呃,直接来一个think d dl,这里边写的就是,就像我们在MYSQL里边创建表一样,Create table,然后给一个名字,然后后边指定ID差20 not on,对吧,然后CT,这是我们要把那个count要写进来,然后给一个big in not not not,这是不是就把这张表就定义好了呀?
08:20
然后后边那你不是要跟MYSQL要连接吗?那连接器我们那个connect又表现在哪里呢?大家看是在这这上面的,相当于是我们前面那个with skima对吧,相当于是那些字段的定义啊,然后真正那个connect的定义是在这儿的点with。大家看这里边就是connector.type等于JDBC对吧,后边是这个URL,这是不是就是我们连接MYSQL的那个地址啊,然后继续后边table名字等于sensor count,我这个表叫,然后后边driver定义好,还有username password,直接属性定义在这里就可以了,所以它是用了这种方式直接去做了一个表的创建,跟MYSQL做了一个连接。
09:03
那后边这是一个DDL,它执行的时候,之前我们见过是那个CQ query去执行CQ对吧,那这个也是CQ query吗。啊,这个就不是了,大家注意这个执行的时候是CQ update,对,执行DDL的时候要要要用这个方法啊,执行query的时候是用CQ query。啊,所以后边把这个一执行,这是不是相当于我这个环境里面就有这张表了,连接好了有这张表了,那接下来是不是我可以直接把得到的那个结果数据表插入进去了。而且大家会发现当前的这个MYSQL里边创建出来的表,连接起来之后,是不是肯定是支持对应的这个up模式的呀,对吧,肯定是可以这个做更新操作,肯定可以去写入的啊,所以这是这就完全没问题了,这就关于这个呃更新,我们了解了更新模式之后啊,写入到ES MySQL啊,其他的一些数据库里面的这种方式啊,我们如果做了更新的话,可以写入到这些地方。
我来说两句