00:00
我们已经实现了从文件里边读取数据,然后把它,呃,写入到注册到一张表里边,然后再把它读出来,呃,这个已经做到了,然后这里面还有一个问题,就是说大家看着这个已经被弃用的OLDCSV可能有点不爽啊,那当然就是有有这个新版本的CSV可以用,对吧?但是为什么我们一开始上来还是用旧版本呢?因为新版本默认的flink代码里边没有给我们提供,我们还需要去引入新的依赖啊,那这个引入的依赖其实就是大家可以看一下这个文档里边啊,我们在这里边,呃,就是可以引入这个flink csv,直接把这个引入,然后下面是对应的版本,一点十点一,我先把这个copy过来,然后这俩的区别在哪里呢?呃,其实主要就是之前我们这个O的版本啊,这个CS sa版本,它是一个非标的一个格式化工具,也就是说在跟很多外部系统连接的时候。
01:00
时候,你要是单纯一个文本文件的话,我们这儿去用它做这个格式化没有问题,但是假如说涉及到其他的一些呃,外部工具,比方说卡夫卡,卡夫卡那边就是必须要求是一个符合RFC标准的这个描述器,来来来进行这个CSV格式的描述,所以这里边就不适用了,那如果说我们想要跟卡夫卡更好的连接的话,那就必须使用这个新的,大家看这个CSV现在可以引入对吧?哎,这里边引入的时候,我们这里边用到的就是刚刚引入进来的flink csv里边给我们提供的描述器啊,这个就是后续大家会看到啊,有很多东西,如果当前flink里边没有提供的话,我们都需要引入其他的一些依赖,那呃,关于这个文件的写入啊,文件的读取当前我们就先讲到这里,然后再给大家来介绍一下,连接到其他的一些啊。
02:00
外部,呃,外部系统对吧?大家想到因为我能从文文件里面读取数据,那我们之前讲那个流式读取的时候,还有一个典型的数据源,那应该是卡夫卡对吧?我们应该能从卡夫卡里边读取数据,这才是真正意义上的流式数据处理嘛,所以接下来我们再来看一下这一点啊,这是二点,一是读取文件,那接下来我们做这个二点,二是从卡夫卡读取数据,其实就是连接到卡夫卡里面去,对吧?啊,那接下来这个操作呢,其实是差不多的,直接in table env.connect然后这个connect里边我们不是要传一个or script这个描述器吗?这里边我们要传的就是卡夫卡的一个啊script,然后大家看当前flink table里边,这里边已经有这样的一个描述器了,对吧?诶,这里边的这个这个描述器,这是在哪里提供的呢?大。
03:00
大家要注意,这其实是我们之前不是已经引入过卡夫卡连接器吗?Fli卡夫卡的连接器里边就给我们包含了tableable API里边想要用到的这个连接器的描述器啊,所以这里边你可以直接引入引入啊,直接用这个卡夫卡引入就可以了。呃,那这里边只引入卡夫卡显然还是不够的,对吧?后面明显是得给它定义各种各样的配置项,哎,那首先我们会看到最最基本的要定一个versionship啊,我们当前卡夫卡的版本,哎,当前我用的是0.11对吧?这是卡夫卡的版本,然后另外还有topic是不是也必须啊,来看有这个参数对吧?哎,我们就像之前一样啊,我们用这个三四这个topic主题,然后接下来那就还有一些配置项了,比方说我们得定义look keepper,得定义这个,呃,当前如果说我们是消费数据的话,那我们有这个bootrap service,对吧,得把当前这个。
04:00
Consumer定义的BOO service要定义出来,所以接下来其实是要定义一个property,大家看你们可以通用的去用这个去定义啊,啊,那这里边我们的参数zoo keepper.connect传这个参数定义the keepper,当前local host,这是我自己在本机起的,呃,2181对吧,这个必须要配置出来,然后另外再去配置一个property,接下来是boot stra.service当前是local host 9092,哎,这样配置好就完了,我们主要需要的就是这些嘛,对吧?然后诶大家会会想到了,之前我们不是还有一个就是卡夫卡里边我们得引入那个序列化工具吗?诶,那你现在连那个都不引入,接下来我们怎么做序列化呢?不要着急,大家这里我们不是还有这个with format吗?对吧?
05:00
而卡夫卡里面呢,它本身就支持我们的这个新版CSV啊,符合RFC标准的这个CSV就是可以用的,当然你也可以用这个Jason,对吧,也可以做这个Jason的格式化,都是可以的,当然你要想要引用Jason的话,还得引入Jason的那个知识包,对吧?Flink Jason就像这个flink csv一样啊,那这里边我们就简单使用了啊,首先with format,然后里边我们直接new一个CSV,直接用它做一个格式化,另外with schema,定义当前表的结构,哎,那大要想这里面表示结构,这不跟那个之前就一样嘛,对吧?你读进来之后数据如果要是一样的话,我们这里边做这个表结构的定义当然还是一样的,所以下边那就是各种field field,对吧?呃,首先一个ID,然后后边是data types.string啊,那后面还有这个temperature和这个。
06:00
呃呃,一个double一个,那我就不详细写了,我直接过来了,对吧,这个就完全一样,直接放在后边,这就是我们这个对齐啊,这就是我们从卡夫卡里边读取数据,用connect的方法做的一个操作,哎,大家看这个似乎看起来就更加通用,所有的这个外部系统好像都可以用这种方式,对吧?只不过就是前面你定义的时候,它本身的描述器可能不太一样,然后里面需要配置的东西不太一样而已,后边做的操作诶,格式化工具在这儿,后边是我们定义的,定义读进来之后的表结构啊,所以这个几乎是完全一样啊,当然了,后边我们需要再去create temporary table,这里边我定义一个叫做卡夫卡input table,对吧,把它定义出来,呃,然后接下来后边我们要做测试的话,那就把这个再copy过来,就from这张表,然后我们这里边定义的STEM都一样,那肯定还是可以转成这样。
07:00
单元组,我们看一看这个结果到底怎么样,好,我先把这个代码起起来,然后呃,我们得去起卡夫卡,得去起一个,我先看一眼卡夫卡还起着不起着好,起着的没问题,对吧?那我们去起一个生产者,好,我们进入到卡夫卡里边,呃。我们进入到卡夫卡里边来,然后去创建一个生产者卡夫卡conso producer啊,然后接下来我们要定义当前的,呃,这个list local host的9092啊,然后另外还要有当前的主题topic,就叫sens,对吧,先把它创建出来,然后这边正常我们应该已经提起来了,接下来呢,哎,那又是当时我们做测试的这个流程,这边生产者,这边生产数据,然后呢,诶,我们看看这边是不是能够消费到数据,把它作为三元组打印输出,对吧?好做一个输入。
08:17
诶,大家看这边包装成三元组做做了一个打印输出对吧?这边输入一条,这边就会输出一条,这就是真正的流式数据的读入和这边的处理输出啊,所以大家可以把这个卡夫卡作为数据源啊,下来之后再好好的测一下啊,那当然有同学可能会有疑惑,就是说诶,这里边我们做这个,呃,读取数据的时候,这里边我们定义的时候,看起来这就是一个表,看起来就是一个一个批处理的一种感觉呀,哎,为什么这里边还可以直接处理卡夫卡的这种流失的数据源呢?哎,这就是我们说的啊,本质上这里边我们所有的这种程序结构,看起来像直接基于表的那种批处理,但事实上呢,它的底层还都是data基于data stream去做的操作,对吧?啊,所以本质上还都是流,后面我们会再给大家详细的讲到。在flink的table API和CQ里边,它。
09:18
对于流去进行这些好像看起来基于表做的这些CQ操作的时候,到底是怎么样去做的查询,后面我们再讲。
我来说两句