00:00
再来我们继续考察一下flink里边调用了table API和flink CQ之后,像外部系统写入数据的这个过程,之前我们给大家讲的这个例子里边是直接输出到了文件,那么这里大家会发现,如果说本身我这里边是一个没有做像聚合这样操作啊,就是数据没有更新,只是不停插入的这种状态的话,它其实可以直接写入到文件里面。但是如果说我是做过更新操作的话,那大家想是不是就没有办法写进去了,因为当时我们想到,呃,把它转换成流的时候,他也要求必须要转换成所谓的撤回流,Retract stream嘛,然后里边的那个数据呢,是一条更新数据,其实代我们这里边输出的应该是两条数据,一个是前一条数据旧数据的撤回,另外一个是新数据的插入,所以大家会发现之前那个流里边你如果是直接往后追加的话,他当然就搞不定,那这里边如果要往文件写入的话,他是不是也搞不定啊。
01:04
对吧,你来一个这样的一个想要更新的这个操作啊,我这里边直接是没办法之前的那个数据写入就写入了嘛,我现在是没没办法再去更新的,这就是流失处理里边这个就是可能大家会觉得不太方便的一个地方啊,所以这里面就是你入到把什么数据写入到什么地方去,这其实也是要有考量的。那前面我们只是讲了一个写入到,呃,当当前的这个文件系统啊,那大家想如果说我我想写入到其他的系统可以吗。当然也是没问题的是吧?呃,像之前我们说过,弗link跟卡夫卡本身就是天生是一对嘛,那你说之前我们说的那个弗link到卡夫卡端到端啊,连接起来保证状态一致性啊,我现在最常见的状态,那不就是从卡夫卡读取数据进来之后,经过flink的处理转换,最后再写入到卡夫卡里面去吗?诶,之前我们讲的那个卡夫卡管道对吧?呃,就相当于是这个数据管道一样的啊,那现在如果我们用table API能不能直接做这样的一个连接呢?
02:10
也是可以的啊,所以接下来我们在代码里面试一下,还是新建一个class测试table test4,接下来我们把这个就叫做卡夫卡pipeline。把这个先放在这儿,然后前边的内容那基本上就大同小异了啊,大家想到是不是还是环境先创建好对吧,有流逝的执行环境,还有基于它创建出来的表环境。呃,最后是执行起来execute,那中间的话涉及到。要要去读取数据了,对吧,大家想一下,之前我们这个是直接读取文件的啊,当时是只写到2.1,其实大家知道,呃,我们后面也没写更多是吧?诶那假如说我们想要从这个卡夫卡里面读取数据,怎么读呢?
03:05
整个架构其实一样的是吧,是不是也是tablev connect,只不过这里边就不要传file system,而要传一个卡夫卡对应的那个连接连接器描述描述器啊啊,所以接下来我们在这里边啊,直接。连接卡夫卡。卡夫卡读取数据,这里面我们做的就是table env,然后直接connect,那这里边要拗一个,哎,那大家会想到那就是跟卡夫卡相关了,大家看它名字就叫卡夫卡对吧?诶,它它是不是就是table script下面的,诶所以我们把这个引入,大家看到它本身就是一个connectorscript对吧?我们要的不就是这个连接器描述器吗?啊,所以接下来有了这个卡夫卡,呃,那那自然我们想到你光这么拗一个卡夫卡没完,相关的那些配置是不是还得写好啊,所以接下来。
04:05
卡卡里边大家看是不是可以有property啊,我这里边就不要,呃,就是单独的一个property,一个property往里边写了啊呃,这里边最最基本的其实要的是这个,大家看version和topic对吧,这两个是首先要写进去的,那现在现在我们的版本version是0.11啊,然后topic啊,就按我们之前想的那个SS对吧,直接给一个这样的topic啊,那另外的话还有一些这个属性确实还得配置进去啊property,比方说这里边必须要的,大家可能知道这个BOO STEM server对吧?啊,那么在当前这个API里面,它还必须指定当前的这个zoo keepper.connect指定这样一个属性。然后当前这个Lu kper大家知道是local host2181对吧?哎,那下面还有这个boottrap。
05:06
接下来是local host。9092,这就是我们做的这个基本的配置嘛,啊其实像其他的那些,呃,那有同学可能说,那那之前我们不是还有一那个,呃,就是序列化反序列化的那个那个stemma嘛,对吧,那个东西我们怎么定义呢?诶大家想起来后边我这个连接之后,是不是还有那个format和STEM呀,那个format是不是就可以指定我当前既然它格式化吗?那是不是就相当于序列化法,序列化的时候那个格式我也确定了呀,你是Jason就Jason是CSV就CSV对吧?诶当前的这个卡夫卡里边Jason和CSV,另外还有这个A啊,这些格式都是完全支持的,所以这里边我既然已经引入了这个CSV嘛,那我在这儿就直接new一个CSV就可以了,注意必须是新版的,旧版的不支持对吧。
06:05
然后接下来with STEM啊,那这里边我们读取进来的数据,其实跟之前的那个定义应该是一样的,对吧,还是你有一个scheme。这里边是不是得到的这个结果也是这样的呀。三个字段对吧?我们想解析出来的不还是这样的数据吗?ID time time STEM,还有temperature,对吧?就是这样的三个字段,我把它直接copy过来吧。Data types引入啊,这就是我们当前这个连接卡夫卡读取数据的一个过程啊,啊,那当然了,做到这儿最后还有一步是create temporary table给一个名称对吧?啊,比方说当前我们这个叫input table,这就是整个的定义的这个过程,连接卡夫卡读取数据啊,那当然这里面其实我们说是读取数据啊,其实这是不是只是连接到卡夫卡创建了一张表啊。
07:01
这里面并没有体现出读取,那读取的话得怎么读呢?啊对,接下来我们就是还是得做转换了,对吧,这里边。简单转换,所以我这里边定义一个就是基于这个table table env去做一个from,然后把当前的input table要读出来,我把它创建命名成叫sensor table。接下来这样才可以用这张表对吧?啊,这是呃,那大家看这个这个一个from,这就相当于已经指明了我这里是不是就是一个table sauce啊,就是一个圆对吧?啊,那接下来我再看这个继续的转换的话,那就是sensor table,比方说我们还是啊select,就像之前一样,把对应的这个数据直接拿出来,这个我就不详细写了,跟之前应该是类似的是吧?啊,只要我把这个注册好了之后啊,简单转换直接这么做,另外还可以做这个聚合转换,我直接全copy过来吧。
08:13
呃,所以这里面其实是做这个中间的查询转换操作啊,查询转换。哦,前面我们少少把那个给删掉了啊,这个之前我们那个是叫sensor table啊,定义一个table,然后叫sensit。基于tableable env做一个from,把当前,把当前的这张表先读取出来input table,然后那当然下边这里边转换处理也要以这个sensor table为准,对吧,这里边的h j table也是sensor table为准,这就是我们这个做处理的过程吧,呃,那C口的写法的话都一样,我这里边就不重复写了,最后这是有了S有了穿,接下来是不是应该输出了,我们还想卡夫卡进卡夫卡出,那是不是还要建立一个到卡夫卡的连接啊啊,所以接下来我们是还是。
09:14
建建立卡夫卡连接输出到不同的主题下面,对吧,Topic下啊,那所以这里边的这个连接大家想是不是跟这个又是大同小异啊,你像前面我们从文件读,然后写入到文件,现在是从这个卡夫卡读写入到卡不卡,那那这里面我还是直接把它copy过来就完了嘛。建立连接的过程就这样对吧?卡夫卡版本这些topic,注意现在要改,之前我们不是叫think think test吗?把这个改一下,然后这里面的zoo keepper和booktop service肯定都是一样的啊,下面同样还是用一个CSV文件格式去做一个写入,那这里面的ski呢?这大家就要注意,你到底看要把哪个表要往里写了,对吧?
10:06
大家注意一下,不同的表往里写,是不是这里边的这个表结构sig码是不一样的呀,哎,所以我们的这个类型都是在这里面sche码里面定义好的啊,哎,那比方说像我们直接要写这个result table的话,那是不是就直接诶这样就只要有这个string类型的ID和这个double类型的temperature就可以了,只要有这两个字段就可以了,那我把这里改成一个。Output table,然后哎,这还这只是注册啊,怎么样才正式写入呢?对,Result table insert into对吧,直接插入进去,这里面是一个当前的表名output table,这就是一个跟卡夫卡连接在一起,完整的一个处理流程。好,那接下来我们还是在代码里边测一下。
11:00
啊,把这个先打开,我先看一下啊哦,刚才那个卢kber卡夫卡都关掉了,那我们还是到目录下边去把它做一个启动。呃,首先是z k serve z k server。我们是先先CD到这个下边,然后呃,去执行一下ZK server server start。先把鲁keep提起来,然后我们到卡普卡下边去。同样把这个卡夫卡server start啊,这里边我们用一个静默启动demon,然后当前这个用server.properties啊,配置文件用这个启动起来,好我们看一眼,好当前已经提起来了啊,那接下来我们就可以把这个代码启动了。
12:03
另外我们想到这个提起来之后,应该还需要有什么,还得有这个,呃消生产者和消费者对吧,那边的生产者生产数据应该topic是哪个。我们就在这里面直接写吧,B对吧,呃,然后卡夫卡console producer,这里边我们要的这个,呃,这个用broke list local host 9092。Topic当前用应该用哪个?对,大家注意这里边的producer生成的数据,是不是我们要用这里边连接读取啊,哎,所以这里边要用sensor sensor好,我们把这个先创建出来,然后接下来我再另外起一个消费者看一下当前的这个数据结果,呃,当前还是到目录下边来。
13:14
我们接下来这个呃,并目录下边卡夫卡cons consumer。呃,然后接下来boot server local host 9092杠杠topic,现在topic注意不一样了,这个叫s test,对吧,这是我们经过flink处理转换之后,然后再得到的这个数据要写入到当前的这个呃,卡夫卡对应的这个topic来,所以接下来我们得用这种方式去做一个测试,好。呃,那这两个都提起来之后,那大家想现在是流式的输流式输入对吧,所以一下子看不到结果啊,那得一条一条输输数据了,那大家想我这里边给一条341。
14:07
这里好像没有任何的反应对吧?哦,那给一条346。大家看是不是346这里就输出了呀,这个符合我们的代码逻辑对吧,本来我们筛选的是不是就是要ID等于346啊啊,所以当然这个就是没有任何问题的啊,那这里如果是347也没有对吧,三四十也没有,那如果要是346。哦,No,如果是346的话,这里边继续会有一个三六的输出,对吧?哎,这就是我们看到这个做转换之后啊,由卡夫卡进经过处理转换,然后得到的结果呢,再写入到另外一个topic卡,卡夫卡的另外一个topic下面去。这是所谓的数据管道,我们又做了一个完整的测试。呃,那这里大家可能会有另外一个问题,就是我们刚才是指示了这个指示了这个result table对吧,那前面做过聚合的这个a j table可以吗?
15:15
诶,大家要注意啊,这里边写入到卡夫卡里边去的话,A j table也是不行的,那主要原因就在于,诶,其实大家想到这里边卡夫卡往往这个底层去写入的话,是不是也会有一个卡夫卡table thinkk呀,之前往文件写,我们看到有一个csv table think嘛,它是不是只实现了APA呃,Stream的这个table think那个接口啊,所以这里面我们看一下卡夫卡table think base它实现的是什么,大家看它实现的是不是也是open stream table接口啊。所以当然它是不是也不能,如果像那个AJ是不是就不光是往后追加了,他还有更改对吧?哎,那那种情况当然就不能直接用它实现了,其实大家也很好理解,卡布卡是消息队列嘛,你说消息队列它直接那个数据是不是只有一条一条来了之后追加灌进去就完事了,他能够去把之前的那个这个已经输输出过来的那个生产出来的数据再去更新一次吗?那做不到对吧?诶所以大家看这卡夫卡是最适合这种像日志啊,不能更改的这种东西,这是最适合的,所以我们这里边如果有更新操作的话,他也做不到。
16:28
跟这个文件上其实是一样的,对吧。
我来说两句