00:00
我们现在已经知道怎么样去把数据写入到卡夫卡里面去了,哎,那有些同学可能也也已经发现了啊,我们刚才定义了两个转换操作,一个是简单的转换,一个是聚合转换,诶那这里边我们只把这个简单转换之后的结果这个呃,Result table做了一个写入,对吧?诶,直接写写写入到卡夫卡里了,那有同学可能说我如果要是用这个a j j table去做这个写入,能写进去吗?大家可以下去之后测啊,我这里告诉大家结论不能,为什么不能呢?诶这我们就得看,当时大家还记得那个csv table thinkc对吧?哎,我们这里边给大家看一下卡夫卡table think,它底层到底是实现了什么接口呢?在这里大家看到有这个卡夫卡table think base,然后下面有这个不同版本的table think,那我们知道你这个011版本的最后还是继承自table think对吧?哎,所以它本身实现的是什么呢?看一眼这个是不是很熟悉a pen stream table。
01:00
Think对吧,它本身还是只支持啊PA追加式的这个流的table thinkk,那如果说我们这里边要做一个更改,呃,有这个更新操作,那是不是你必须就得是像我们前面那个打印输出的话,你得用那个tract stream嘛,那这里边它不支持,或者说UPS,呃,Streamam这里也不支持,那当然我们这里就不能做对应这个AGA table的输出了,对吧?啊,那这里大家其实也非常容易理解啊,就是卡夫卡本身是一个消息队列嘛,确实它是可以在查到我们之前就通过偏移量是能够找到我们之前的所有这个写入的数据的,但是呢,你如果给他一条消息,说你去把之前的某一条消息给我更改过来,这个是卡普卡做不到的,对吧?卡夫卡不是用来做这件事的,它其实就是把所有之前你输出的信息都要放在这个队列里边,他就是要保持这个原样的,所以它的这种消息队列的特性啊。
02:00
先入先出这种特性,这本来就是跟我们这里边的只往后面追加的这种流的特性比较像,对不对?哎,所以它支持的模式就只有一种啊,Pad模式,那大家可能就会想啊,那照这么说的话,我们说的那个更多的看起来更好用的那个模式啊模式up模式就真的就没有用武之地吗?哎,当然是不是的啊,之前我们其实在那个啊fair out做测试的时候,之前我们也给提到过一句,就可以直接把它转换成一个table啊,就如果做过更新操作的这个table,我们可以把它转换成一个retract stream,然后去做打印,对吧,直接在控制台是可以看到这个信息的,这是一种处理的方式。然后另外呃,如果说我们想往这个外部系统数据库里边去写,能不能写入呢?当然也是可以的,前提就是你必须外部系统这个数据库,它是一个能够支持。
03:00
模式或者upset模式的这样的一个数据库,比如说下一个我们要讲的这个ES就非常典型对吧?啊,那ES这里面大家看这个整体来讲,这个代码去做实现的时候呢,流程是差不多的,你看到还是connect,对吧?呃,接下来我们就拗一个这个elastic search,然后呢,下面有这个version,有个host,把这个该配的东西配出来就可以了,然后后面大家注意有一个需要注意的地方啊,比较特殊的地方是什么呢?它要配一个in UPS mode,就是对于当前我已经connect连接起来之后啊,要去配置一个当前朝外部系统写入数据的。更新模式,这里面我们用的是UPS mode啊,那那当然就是说明ES是支持UPS mode的,对吧?那就是可以用这个UPS模式去写入数据,那接下来我们给大家把这一部分代码再做一个测试,同样还是在table API下边新建一个object。
04:09
当前这个我们叫ES output test,同样还是把这个没方法先写出来啊,那前面的内容大家想到基本上都差不多对吧?创建环境,呃,当然这里面我们是从卡夫卡去读取数据啊,如果大家不愿意从卡夫卡读取数据,觉得太麻烦的话,因为刚才那个producer我们也已经关了嘛,这里边引入这个环境的时候,大家还是注意一下啊,该把这个下划线影视转换引入还是先引入防止后面出问题,然后呢,我们可以把它改成那个文件输入对吧?我们直接连连接外部的这个文件系统,把它读成这个input table,然后后面这个转换操作,这就一样了,对吧,我们还是把这个简单转换和聚合转换直接定义出来,所以我就直接抄这个文件处理的这一个代码。
05:03
好,那接下来大家看这个前面做的。读取数据啊,然后注册表input table,然后后边做这个转换操作啊,简单转换和这个聚合转换都做完了,那接下来我们就是要输出到ES了,对吧,输出到E,那这里边做这个ES输出的时候也是首先我们要把这个。Table要定义出来,直接在环境里边定义table env,然后connect里边,诶这里边你有一个大家看直接用elastic search,诶这是哪里提供的呢?诶大家看它是在flink tablescript里边,对吧?它其实就是之前我们引入的那个给大家看一眼啊,之前大家引入的ES不是有官方连接器的吗?Link connector yes ES search吗?对吧,这个base里边大家看下面有一个包叫flink table scripts,这就是专门为table API给提供的一个描述器,那所以说这里边我们引入的就是这个啊,Elastic search啊,所以这个描述器,它这个基本上都非常的直白啊,直接就用的是这个名称,然后接下来,首先我们把它创建出来,接下来就定义一些version,对吧,比方说我们要现在要用的这个version是6VERSION6。
06:26
然后后边呃,接下来继续定义一些其他的东西啊,比方说必须要定义的当然有这个host啦,我们我当前是本机,所以是local host,后面继续给这个port,大家看port int对吧,9200,然后第三个参数大家看还有一个呃,就是当前的那个协议,当然都是HTP了啊,直接发h hep请求,这是host的定义,后面呢,诶,必须的肯定还有一个别的配置,我们不说了,须的还得有index对吧,Index这里是必须给的,我们这里边就叫做SENS4吧,诶然后后边呃,还应该有一个这个type对吧?呃,这个ES6的话是需要去指定type的,这里边我们定义的是document type,那这里边我随便给一个吧,比方说我这个叫呃,Temperature对吧,这个无所谓啊,大家知道是什么东西就可以了。呃,然后接下来。
07:20
接下来把这个connect已经定义完成之后,大家看得到的当前得到的这个东西呢,是一个stream table script对吧的流处理的一个表的描述器,那么这个描述器里边呢,可以去调用,哎,大大家知道可以调那个with game嘛,With format对吧,另外还可以指定它当前的更新模式,大家看到这个更新模式就是in a mode in retra mode in upset mode这三个你定义哪个都可以,哎,但是这里边你不能乱定义,为什么呢?你这里边定义了之后,是不是就要求你写入到这个外部系统的时候,它必须得支持才行啊,对吧,你在这里边定义的时候是没问题,就是我这里边随便写哪个都不会报错啊,但是如果这个ES本身它并不支持某种这个更新模式的话,那我们这里边,呃,你定义了之后运行也会报错,所以这里边我们一定要用它支持的,它支持什么呢?当前它支持up的模式。
08:21
Upset mode对吧?啊,那接下来我们就是继续做那个常规的with format和with stemma的操作,那这里面大家要注意就是写入到ES里边的这个格式化方法呢?呃,之前我们说过ES一般都要包装成一个Jason object或者一个哈web对吧?啊,那这里边我们格式化就只能用Jason的方式,就不能用CSV了啊,所以哎,大家看这里边本身没有Jason的一个,你难道是用这个就是Java或者scalela里面的j object吗?不是,它这里面要求必须得就是有这个flink自己的要求,所以说我们又得在当前的这个po文件里边引入相关的依赖,前面我们不是引入了这个flink csv吗?接下来我们要还需要引入一个flink Jason,呃,同样当前的这个版本是一点十点一对吧,把这个一代引入放在这儿,然后接下来我们就可以在代码里边。
09:21
啊,我们把这个前面不必不必要的这个代码就先关掉啊。现在我们是e output里边把这个Jason引入。大家看这个Jason引入之后,现在我们就可以,诶把它定义成当前的这个格式化工具,对吧?啊直接有一个Jason,然后接下来还有一个非常重要的内容,就是要定义当前的结构对吧?表结构STEM,那这里边我们还是你有一个你有一个STEM,呃,那里边的定义其实我们都已经知道了,就就跟前面的那个定义是一样的,我们现在主要想测什么呢?想测这个聚合转换对吧?因为大家知道你假如说是这个简单转换的那个result table,你想用UPS模式去,呃,就是做一个写入的话,当然也可以啊,因为他只有追加的话,那就只有插入的数据嘛,那你插入数据我当然可以涵盖,这个是没问题的,我们重点是想测什么呢?想测它的这个就是最后我们把这个做过聚合之后,有更新操作的这个a j table做一个转换啊,那我们就针对它来做一个定义啊,啊接下来我们field。
10:36
知道前面有一个ID对吧,要给的数据类型是string,然后另外还有一个字段field的是,呃,后面还有一个count,对吧,我们知道这个做转换的时候,最后得到的是一个ID的count嘛,所以后边把这个data types,它是一个big in,对吧,长整形吧,所以是一个big in,把它定义出来,最后做一个create temporary table,这个我们叫做ES output table。
11:07
然后最后我们把a j table做一个insert into,写入ES output table啊,最后哎,还是不要忘记把这个执行起来,当前的环境执行起来,当前是ES output test,这就是完的流程,当然如果我们要的话,那需要先起一个E,呃,同样啊,我们进入到目录下边去把E把ES提起来,我现在用这个ES6啊,用的是6.8.5啊,我直接把它启动好了,呃,然后接下来我再另外起一个。窗口去监控一下当前这个ES里边的状态,对吧?呃,大家知道呃,你可以用这个K巴那或者说用一些其他的东西去做这个查查看,我这里边轻便一点啊,直接就用一个客命令去查看就完了啊,那大家知道这个local host9200,这是当前的这个host name和端口号,那我先看一下当前有有哪些这个indices对吧?有哪些索引,用一个命令先查一下。
12:21
哦,现在应该还没没起起来对吧,我们看哎,Started现在应该提起来了啊,接下来应该输入应该就可以看到内容了,现在没有东西对吧?现在还没有任何的,呃,所以接下来我们来执行一下这个代码,看能不能真正的把当前的数据写入,大家看现在这个已经提起来,我们让这个代码运行去做一个写入,我们现在应该是读取的文件系统,对吧?对读取的文件,那我们可以等待它写入进去就可以了,这个执行的过程大家发现确实好像比跟碎美API的那个代码会慢一些,对吧?因为大家想到这个又做了上层,做了一层包装,有这个,呃,对,对应的这个table API的一堆那个转换操作啊,所以说它肯定是会稍微麻烦一点,好,这个已经执行完了,我们看一眼当前的这个,诶,多了一个,多了一个叫做sensor的index,诶,那接下来我们看一下它的内容吧。
13:22
还是一个客命令敲一下啊,Local host9200,我们现在是要看这个3ENS4的内容,做一个search,我们用这个pretty的方式显示出来,好大家看到这里边,诶一共有这个四条数据对吧?诶为什么是四条数据呢?因为我们当前就是四个三四嘛,三四十三四一,大家看34736对吧?然后除了341这里边大家看count是五之外,输出最后一个统计的信息,大家看阿的模式是不是就做了更新啊对吧?没有说34111 34112好几条都都写在这里,最后我们就是直接按照当前的ID作为K,哎,那有同学可能说阿四的模式,你不是要找那个idn找当前的key吗?这个key是在哪里定义的呢?我们在代码里面并没有定义key啊,哎,但是大家注意了,我们在做转换操作的时候,是不是有group by啊,对吧。
14:22
Group by这里边当然就已经指定K了,所以如果我们基于已经转换过操作啊,做过group by之后的这样的一个表的话,当然可以知道当前的K是什么,所以大家看到这个连接连接器啊,连接ES之后,它其实就告诉了ES我当前的K是ID对吧?啊,所以这里边我们可以呃,ES去做这个更新操作的时候,就把相同ID的内容直接更新覆盖,得到了一个COUNT5,这就是这个输出的结果,这就是所说的这个ES就可以实现我们这个更新的操作,对吧?Upset模式,那当然了,我们说这个apad模式是最简单的,你如果想要用这个apad模式也是可以的,对吧?ES也可以这个in apad mode,但是呢,你如果用apad mode就不能去输出h j table了啊,因为我们说这h table它本身就必须得去有这个更新操作嘛,所以这里边就没有办法去做对应的这个操作啊。
15:22
那这里面大家可能会想到,为什么ES就可以做这个复杂的操作呢?我们这里边再搜一下啊,大家知道底层肯定有一个elastic search,呃,Table think,诶,大家看到这里面有一个elastic search absur。Table think对吧?呃,但底层它是一个table think base,这这我们一想就能想到ES6的这个,它底层是一个table think base,大家看它实现的一个东西就是upsur streamam table seat对吧?呃,就实现了这样的一个,呃,当当前这样的一个更新模式,Upset的模式,那有同学可能说,诶,那他没有实现uppa stream table think,那是不能用那个APA模式吗?哎,当然不是了啊,就是我们这里边,其实你如果要是说实现了upsur streamam的话,呃,这个更新模式的话,你只传递一个追加的这个操作,那当然我可以在后面去插入写入啊,这个完全没有问题对吧?呃,所以说这个其实是它直接可以兼容这个ipad模式,具体来讲的话,大家可以到下边去找到一个,我们有一个这个大家看到,呃,这是这个upset base啊,我们要找到它对应的那个factory,就是它的那个工厂,工厂类啊,这个think factory。
16:39
这个base,然后在这个factory里边,诶大家看到这里边有一个这个方法叫做get validd proper properties对吧,这里边有一个这个script valid,然后看到大家看到这里边他默认传一个new new这个vali的时候,他传的是什么呢?传的是就是support aend,支不支持aend呢处支持对吧?哎,支不支持retract呢?注意不支持,然后支不支持abset支持对吧?所以这里边嗯,你要调用的时候,那就是我可以当前这个ES,可以去in UPS mode,也可以inend mode,但是注意你不能用retra mode对吧,因为当前这个ES底层的这个呃,Think base,呃,就是当前的这个table think base并不支持re模式啊,所以大家就看到了,这三种模式是都有的,你至于能不能用,到底用哪种方式去做写入,去做输出,得看外部。
17:39
系统支不支持对吧,也就是说他能不能解析你当前给的这个消息啊,那有同学可能说,诶,你之前不是说那个reject模式更更容易解析,更容易实现吗?为什么它还不支持呢?哎,那大家就想到了,你既然都支持ABS mode了,那何必还要支持reject呢?因为我们说reject mode它相当于还会两条信息去表示一次更新操作会比较呃复杂一点,对吧,从效率上来讲的话会低下一点,所以说这里边我们只支持up mode就已经够用了,所以接下来大家可以就是按照这种方式去多做一些测试啊。
我来说两句