00:00
我们现在已经知道了,在table API和FNKCQ里边所谓的更新模式是什么东西,大家发现这种更新模式的定义,其实主要就是说我们要想把当前的这个数据要跟要写入到外部系统,跟外部系统交换这个消息类型的时候,哎,那那这个我们就必须得定义出来,你到底要做的这个操作是什么对吧?要要交换的这个数据到底是什么样的一个信息,什么样的一个类型,所以我们发现它分成了这样的三种不同的模式,那这里边追加模式,A判模式,大家发现了,它是最简单的一种方式,对吧?哎,所以说正常来讲,你只要是一个外部系统能够写入数据,它是不是肯定可以在后面追加呀,对吧?你不管是文件也好,还是消息队列也好,还是这个,呃,数据库也好,不管是关系型的还是非关系型的,所有的这个,呃,数据库外部系统,文件系统啊,肯定都是可以直接往后面。
01:00
回家,所以说大家想到这个apad模式,其实就应该是所有系统至少默认的能够实现的一种模式,对吧,你就不管是什么样数据,我这里边留数据一条一条输,输出,我就一条一条在后面叠加嘛,哎,这个其实是完全没有问题的,但是如果说我们在前面因为都做了一些表操作嘛,假如说我们对于这张表里边做的操作,你除了往后追加,还做了比方说插入,呃,插入当然就是追加了啊,还做了比方说删除和更新这样的delete update操作的话,这就涉及到你到底怎么样告诉外部系统,诶,我现在是做了一次update,对吧,我给你传的这条数据是要更改的,更改之前某条数据的,并不是说直接把这个追加到后边就完事了,所以这里边我们看到有两种通知的方式,一种是撤回模式,撤回模式我们说编码的方式呢,是它只传两种信息,一种就是增加。
02:00
一种就是减少撤回对吧?啊,所以说就是如果你要insert插入一条数据的话,我传一个添加I的信息,如果你要是要去撤回删除一条消息的话,我呃一条数据的话,我去发送一条撤回消息,Retra,那如果要是更新的话,那我就把上一条要更新的那一条先做一个撤回,然后再把就是我更新之后的那个新的数据做一个添加,对吧?它是用这样的一个方式实现了这样一个功能,其实这里边大家如果仔细想一想的话,会发现这个撤回模式,这相当于它就是我们这里边真正输出到外部系统的是个什么呀,它就有点儿像还是一个当前处理操作的一个一个流,对吧?哎,就还是相当于我们就是做一个更改操作的流,只不过这个流里边呢,对于更新这种update的这种操作,我们是分成两步。
03:00
来做的就是先撤回之前的那一个操作,然后哎,再做新的这个添加,把这两步结合起来就是一一次更新,所以外部系统在接收的时候,大家发现其实正常来讲的话,我应该就是只要能接收这个追加模式的这个连续不断的数据,我是不是就应该能接受这个撤回模式啊,大家想这个撤回模式它其实就是你最多就是连连着收两条信息,表示一条这个update嘛,对吧?啊,但是呢,对于有一些外部系统,比方说数据库来讲,它不支持车位模式,为什么呢?因为就是说你你要能够支持它的话,你必须得解析我当前的这个消息,对吧,我必须得检检查到哦,你现在是要撤回呃,当前的之前的每一条信息,然后我去查到,查到那个东西的话,我再去改对吧,再把这个新的值再再覆盖上去,那有一些数据库可能会发现,我如果要是直接能实现。
04:00
模式的话,那我就不用retra模式了,对吧,因为我们说up模式的话,更新操作和插入操作都只是一条信息,都是一个up消息,所以说这个就精简了很多,那有些数据库我明明就可以直接用一个up的消息,我直接查那个K对应的那个值不就完了吗?对吧,如果查不到,我做一个插入查到的话直接更改,那这种就避免了这个撤回模式里边这种繁琐的,呃,我们这里边一个操作,然后要发两条消息的这种,这种传递,呃数据的方式就避免了这种情况啊,那这是关于更新模式,那所以之前我们讲的这个写入到文件,输出到文件系统里边的时候,大家会发现啊,当时我们在源码里里边也看到了CSV的table think,它里边只实现了一个uppa的模式,对吧,它只能追加,而且我们仔细一想也也非常明确呀,你像这个这个file system啊,文件系统,你要让他去做更。
05:00
他怎么更新嘛,呃,当然就是有些同学说,那我直接把这个就是他弄成一个这个retra retra的模式,然后我让他把我们那个就相当于带着前面那个true false的那个信息直接就全写到那个文件里边,难道不行吗?哎,当然也可以,但你那样的话就相当于只是把他所有的消息原封不动的给放在那个文件里边了,最后没有起到我们想要做的那个行为的效果,对吧?我们想要的并不是说你把那个除false带着那个信息全全列在那儿就完事了,你要列在那的话,我人去看的时候,不还是得相当于这个人工手动的再把那个信息做一个规并嘛,再做一个整理嘛,那所以我们希望的是你自动文件系统就帮我检测出来哦,我要改哪个,你给我改了,但大家想文件系统能能做到这一点,肯定做不到对吧,所以它不支持别的,像这个retra和up模式哦,那我们就想了那别的一些。
06:00
外部系统能不能支持更多的模式呢?啊,接下来我们就讨论的是卡夫卡对吧?哎,非常经典的跟flink相连接的一个应用叫消息队列卡夫卡,那卡夫卡的这个使用大家看到跟前面我们定义那个输入的时候其实差不多,对吧?你看这里边我们定义的时候还是connect,一个卡夫卡连接一个卡夫卡,然后这里面配置的时候还是version topic,呃,Topic应该不一样,对吧?你输入输出的topic肯定不一样,然后把我们对应的这些keepper和这个book service定义出来,后边呢,还是with format with sche码,对吧?呃,你想要写入的这个数据的格式都定义清楚,然后呢,创建一张表,我们管这个叫做output table,然后之后怎么办呢?哎,这张table就不像我们那个输入源那边,直接我们用环境from这张表读数了,对吧,现在是直接把某一个表音色的into写入到这儿,它就变成一张输出表了啊,所以大家看就是前面我。
07:00
我们讲那个S就是在这个呃,Table API的它这种调用方式里边,确实是不怎么区分S和think,对吧?诶我们这里边其实你看就是都是连接到外部系统嘛,连接上之后,然后你具体它是S还是think,有我们后边的这种转换操作的这个拓扑图,这个DG里边定义好的操作来决定,对吧,你到底是从它里边去读数,还是说得到的结果要写,那接下来我们在代码里边再做一个实现。好,那接下来这个代码里面,我们新建一个新的这个。这个我们当前测一个,呃,这个我们就不要叫卡夫卡output了,我们测一个比较特殊的,之前我们在这个data streamam API讲解的时候也做过,就是卡夫卡进,卡夫卡出,当时我们说做一个数据管道,对吧?哎,就是相当于我们在做这个操作的时候,我们做这个ETL,或者说做其他的操作不经常会用到吗?呃,就是我们先消费这个卡夫卡里边统一的数据,有一部分拿出来,诶我们做完操作或者说呃,这个还想把它再做一个转换,再写回到卡夫卡里边,让下游的部门再去用,那这种情况下,这就是一个啊,就像水管一样,管道一样,所以我们定义一个卡夫卡pipeline test,好把这个先创建出来,然后命方法先写下来,呃,然后接下来我们这个具体里边的一开始的初初始的这些定义,我们就直接抄这个之前这一部分吧,首先还是创建环境,对。
08:45
然后这个连接外部系统这一块就不太一样,我们先把这个写进来。呃,当前还是啊,为了后边我们这个调用方便,影视影视转换该引入的引入,这个也把影视转换引入,大家可能发现了,你这个影视转换不引入,后边我们做这个table API,它的那个表达式啊,呃,Expression去做那个判断的时候,其实都没有办法引他的那些引视方法,对吧?啊这里边你必须把这些引入,呃,然后接下来已经有了这个环境之后,接下来我们是把那个S啊卡夫卡传进来对吧?这里边我们是读取文件,诶这个是卡夫卡,我们把它读进来,直接这里边我们该引的全引入对吧?诶这里边就直接就是第二步啊,从卡夫卡里边去读取数据,然后后边呢,我们就是再去可以定义一些转换操作啊,后边这个该转换的还是转换过来。
09:47
比方说我们这里边,呃,大家看到这个有a table API的这种用法,也有CQ的用法,这个无所谓了啊,我们用哪种都可以啊,后边这个是转换操作查询转换啊,当然这里面有一种转换是这个简单转换,另外我们还说了,还有一种转换是聚合转换,对吧?我把这两个都给大家copy过来。
10:17
这里面3.1简单转换,具体的过程大家都已经清楚了,对吧?这个简单转换的时候,我们其实就是不做聚合嘛,只是做了一些select,大家就知道,就像一个一个投影啊,就像一个map一样,把这两个字段提出来啊,那filter的话就是一个过滤嘛,啊,这个非常简单啊,那另外还有Fla map也是类似于这样的一个操作啊,那我们这里面做这个聚合转换的时候呢,就要先定义K去做分组group by,然后哎,我们这里边调用本身已经给我们提供好的这个聚合函数,去把它做一个聚合,然后c select出来,这就是一个简单的聚合转换,好,那接下来最后我们就得做这个输出了,对吧,输出到卡夫卡,那这个输出到卡夫卡的时候,这个怎么样去?呃,就是连接到一个卡夫卡外部输出的这张表呢,跟前面一模。
11:18
样,我甚至就直接就抄就好了,对吧?所以这个过程大家看前面的内容我们都已经熟悉了之后,后边就会发现代码其实就一大抄,但是我们需要知道里边的原理对吧?啊,这里边有些要改的地方你千万不要,呃,不要忘记啊,这里边我们输出的这个topic肯定跟输入的不一样啊,之前我们这个输出叫think test对吧,我们改一个叫think test这样的一个topic,然后后边的lukeper,还有这个BOO STEM server,这个都都一样对吧?21819092,然后我们的这个格式化工具还是CSV,那么stemma呢?诶,这里边你要看我到底输出什么了,比方说我要输出这个result table,大家发现这里边是一个ID,一个temperature,所以这里边我其实是需要,诶这个就time STEM就不需要了,对吧,只要两个字段一个ID一个temperature就够了啊,当然了,这里面就是我定义这个字段的时候,这是我输出表里面的字段,可以跟前面我。
12:18
输这个输入的时候,这里边定义的那个字段不一样对吧?呃,就是我是我拿的是这个table里边它的那个数据嘛,所以说在这里边我可以对于当前的这个字段做一个重命名,只要这个匹配上第一个是ID,第二个是碳就完事了,然后接下来后边我们把这个改一下,这个叫卡夫卡output table对吧?然后最后大家不要忘记啊,就是这是只是把它创建出来,我们接下来需要把比方说result table去做一个insert into对吧,写入到当前的这个里面来,最后大家不要忘记还有一个启动当前的任务对吧?当前是卡夫卡pipeline test,好,这就是我们完整的一个流程啊,呃,那所以我们可以把这个代码启动一。
13:18
一下大家看一下这个测试的结果怎么样啊,大家看到这里边这个执行报错,哎,他说这个input table没有找到对吧?因为我们这里边大家看到这里边用到了哪一个名字呢?诶,这里边用到了一个卡夫卡input table对吧?那你这里边如果要是我们去读取这个数据,要做简单转换的时候,当然就应该还是得用这个名字卡夫卡夫卡input table对吧?要不然你前面没有定义,没有在环境里边注册input table啊,我们并没有找到,哎,所以把这个要改过来,重新运行一下。但现在我们这个已经提起来了,哎,这里边没有任何输出啊,因为我们现在还是卡夫卡的源,对吧,你这里边没有给这个数据输入,那当然就不会输出了,我这里就一直在等着呢,所以接下来我们看一下这个,把这个夫卡这边要去起一个,因为我们这里边是卡夫卡进,卡夫卡出嘛,所以要去起两个东西啊,就是要去要去起一个,首先我们把这个producer先提起来,R啊,这个brokeer list local host 9092,然后我们当前的这个topic。
14:40
是呃,这个当前生成的这一个,应该是我们这里边作为数据读入的那个topic,对吧,大家还记得这个topic应该给的是看一眼啊。呃,这里边我们这个topic给的是3S,所以这里边TOPIC3ENS先把这个,呃当当前的这个就是啊,先建出来,这里已经有了,然后我们去创建一个,呃,我们还是进入到进入到这个卡夫卡下边来,呃,接下来去定义一个卡夫卡consumer consumer consumer啊然后接下来我们定义BOO,呃,BOO server local host 9092对吧?呃,然后接下来我们这个topic topic当时我们给的是叫think test对吧?啊,就是这样一个topic去做消费,我们看一下现在它的这个状态。
15:53
对,然后我们把这个还是做一个显示吧。
16:06
呃,好,现在我们做一个分屏显示,然后这边输入数据,经过这里边的我们弗林里的table API操作之后,然后最终得到的结果再返回写入到当前的这个另外一个卡夫卡topic来,对吧,完整的一个处理流程,好,接下来我们一条一条数据写入341复制一下放到这儿,诶大家看这里边就输出了一条,对吧?而且这是我们提取出了对应的,呃,就是当前的那个ID和temperature只提取了两个字段过来啊,这是我们知道当前的这个状态啊,然后接下来346,诶这里面没有输出,因为当时我们做了一个filter,对吧,只filter这个341出来,那当然你如果要继续输入这个347的话,肯定也没有,对吧?如果你再输入一个341的话,当然这里边就应该是正翅有一个,诶,这里边没有复制上啊。
17:11
嗯,我这里边好像有点问题,没有复制上,好,接下来我们把这一个复制过来,大家看一眼,这里边三一就又输出了,所以说整个的这个流程依然是我们之前说的卡夫卡进到我们的这个弗林里面来,对吧,然后在卡夫卡出再输出到另外一个topic皮格里,经过ETL转换操作之后得到的结果输出出来,所以这就是完整的一个卡夫卡管道测试。
我来说两句