00:00
在本章的最后,我们再来系统的介绍一下flink CQ怎么样去连接到外部系统,其实前面我们都已经介绍过了,就是创建连接器表,然后设定不同的connector就可以连接到不同的外部系统啊。那所以本质上来讲我们的方法都一样,就是在创建表的时候加一个with子句,通过配置当前的connector,不同的connector连接到不同的外部系统。那最常见的我们之前都已经用过,那就是把它配置成print,接下来别的参数,任何参数都不需要有,就是直接输出到控制台进行打印。就是最简单的场景,那其他我们在项目实际当中呢,肯定不会在控制台进行输出了,它主要是用来测试,那在实际项目当中应用最广泛的是哪种情况呢?诶,当然最常见的就是卡夫卡。我们说卡不卡跟。Flink都是基于流式数据的一个处理的引擎,或者是存储系统,所以本质上来讲,卡夫卡跟flink的连接是最为普遍,也是最为方便的。那卡夫卡的CQ连接器呢?它可以从卡夫卡的topic里边去读取数据,把它转换成一张表,这是我们说的。
01:24
相当于把当前的这个卡夫卡这个连接器表作为了输入表来使用,也可以呢啊,也可以将某一张表里面的数据直接写入到某个topic里面去,这样的话就相当于是输出表啊,那所以我们对于这个卡夫卡表的使用就可以用在。S端也可以用在thinkc端,如果想要连接到卡卡,首先我们需要引入依赖,这个其实就是卡夫卡的连接器啊,这不需要多说了,我们之前在data stream API里边介绍的时候曾经把它引入过。
02:00
这个连接器里边不光有这最PI的支持,也包含了table API的支持啊,所以这个其实是没有任何问题。另外这里需要注意的是。Flink CQ里边,对于连接器而言,我们应该引入对应的一些格式的支持工具,也就是说对于卡夫卡而言,我们得考虑当前这个卡夫卡的数据读进来之后,我们到底以什么样的格式对它进行解析转换呢?啊,比如说我们当前的数据就是逗号分割,那我们知道这就是CSV格式嘛,啊,那或者我们当前是Jason那种形式啊,呃,括括号把它括起来的一层一层,那这样的话也可以对应的把里边的字段和值进行提取,进行解析,那所以这个是可以直接进行转换的,因为如果做不到这种转换的话,那我们怎么提取表里面的字段呢?所以这一部分必须要有,那对于卡夫卡而言,当前是可以指定csv Jason或者A诶等等这些主要格式目前都是支持的。
03:06
Flink为各种连接器都提供了一系列的表格式,所以如果说我们当前使用的这个数据就是CSV格式的话,那我们还应引入flink csv这样的一个依赖,一个支持啊,其实之前我们在解析这个从文件里面读取数据的时候,也已经把这个依赖引用过了啊,那所以这个我们就只要是有对应的依赖包,当前都可以不管。接下来关键就是创建连接器表,创建连接表表的话,那我们当然就是这个前面的字段还是一样都不用去考虑,关键就是connector是等于卡夫卡。连接到款后面呢,必须要有当前的主题是什么,然后要有boop,要有呃,Group ID,当然这个就是可啊消费者组,然后再加上比如说可以有这个scan stand up mode啊当前。
04:02
的,呃,起始位置的偏移量啊,那另外还有一个format,这里边必不可少的当然就是connector,然后topic,另外boostrap service,呃,另外这个format也是需要,这几项是必须,其他还有很多可选项。这里需要专门说的是,专门提一句的是知道卡夫卡本身是消息队列嘛,所以从里边去读取数据,这个没问题,连接了之后,诶,这个topic里边如果有有数据我们去消费的话,读进来的应该就都是流式的数据,一条一条的。一个一个进来,每来一个,我们就按照这个CSV进行格式化处理,然后提取出相应的字段,把它写入到插入到当前这个输入的动态表里。那假如说我们当前是要把这个数据写入到卡夫卡里边呢?这个时候又应该怎么办呢?哎,那我们会想到,那就必须这张结果表里边就只能是一条一条追加的数据。
05:05
如果说当前这个表里边出现了更新操作卡夫卡怎么办呢?哎,那正常情况下我们知道卡夫卡是消息队列嘛,你直接往里边写,这显然就不行了。所以我们的这种定义方式,如果说作为输出表的话,那就只能是输出颈追加模式的流,也就是说a onre颈追加流没有更新操作,可以用这种方式写入卡夫卡。那假如说有更新操作,又应该怎么办呢?当前卡卡就没有办法识别,你到底是要是要删除一条数据,撤回一条数据,还是要插入一条新的数据了。那为了解决这个问题,Flink专门增加了一个特别的连接器,这个连接器叫做absurd卡卡。那它所谓的UPS卡夫卡就是可以支持更新插入操作,UPS操作的卡夫卡可以识别,我们当时不是介绍了,最终我们要把这个动态表的甚lo日志的这个信息要进行编码。
06:11
输出到外部系统吗?诶,那所以当前如果我们以upsur方式去进行编码的话,使用这样的连接器就可以让卡夫卡进行正常的解析啊,那所以它进行处理的当然就是更新日志流了啊,那对于table source而言,这个连接器呢,就会把topic里边读取到的所有数据都解释成针对当前K啊,之前我们提到up的模式下必须要有K的定义啊,所以它就是针对当前K的数据做的一个更新操作。所以他是会去查找我们动态表里边K对应的那个数据进行更新的。啊,那如果说要是没有对应行的话,当然就是直接插入了啊,就UPS嘛,更新和插入是一回事。那如果要是table think,如果要是输出怎么办呢?输出的话同样还是得到当前的更新日志流,把这个更新日志流直接以upsur的编码方式写入到卡卡里面去,那怎么写入呢?如果当前是添加消息的话。
07:20
注意是遇到插入或者是更新后update after,就是加那个操作的话,加优数据的话就正常写入看看。诶,那同样对应,如果要是删除或者是撤回减U那样的数据的话,那怎么办呢?啊,就把value为空的对应数据写入卡普卡,那就呃,注意啊,这里边我们要求是按照K的值对数据进行分区,这样的话,我们对于同一个K的更新和删除的操作就都会落落到同一个分区里面去,就相当于把之前我们想要更新的那条数据删掉。这样的话,我们整个就可以删掉一条,再追加一条,相当于就实现了update这样的一个操作。
08:05
那所以接下来我们可以看到up卡夫卡,这个在使用的时候怎么定义呢?啊,其实别的都不用改,其实关键就是把当前的connector要改成upsurd卡卡。Topic当然还是需要的啊,Boostrap server还是需要的。另外就是当前的format就不能只有一个了,我们要分别去定义点format和value.format把它俩都定义出来,后边就可以正常去使用。可以看到这里的例子呢,下面有一张表,Create table,它连接器是卡夫卡,这就是一个一般只能进行插入操作的卡夫卡连接器,而上面这个s per region啊分区,它其实就是卡夫卡,它是可以作为输出表的,而这个hes我们知道这就不能。针对这个有更新操作的数据啊,它就不能作为一个输出表,它只能是连接上之后去读取数据,或者说是只有。
09:08
仅插入操作的这个APP stream这样的流可以去写入到当前的这个卡普卡主题里面去。所以我们看下面的应用,诶,那我们就是从S这个表里面去读取数据,然后呢,把它插入到region里面去,这样的操作是可以的,这里的关键就是group by user region啊,这里边是指定了kid,这就是我们当前的kid。这就是关于这个卡夫卡不同输出模式下的这样的一个连接器的连接器表的定义,那当然了,除了卡夫卡之外,还有各种各样的外部系统我们可以进行连接,那常见的呢,有文件系统,这个之前我们都已经做过了,就不再去详细介绍,就是connector等于file system啊,那后面主要的配置项就是当前的文件路径,Pass,文件格式和。
10:02
那同样呢,我们可以连接到GDBC,也就是说建立一个GDBC连接,就是通过GDBC驱动程序的任意关系型数据库建立连接之后就可以去读写数据了啊,那当前呢,我们可以写入到这个MYQ或者post graq或者derby,这些都是可以的,引入依赖的话,当然就是flink connector g d bc啊,要把对应的这个依赖要引入,那如果说想要连连接到特定的数据库,我们可能还得引入相关的驱动器,比如说是输出到写入到这个MYSQL的话,那你得把myql connector知道吧,这个要引入。至于这里的版本,我们可以根据自己所使用的版本来进行选择啊,那创建这个连接器表的时候也是同样,Connector定义成GDBC,后边的关键点在于要有一个URL啊,当当前这个GDBC,比方说MYQ,然后local的3306啊,对应的数据库名这个要有,后面呢,它应该有一个c name,比方说users。
11:06
哎,这个可能会比较奇怪,我们这里不是已经有当前的表明了吗?注意这里不一样,我们上面create table这里的表名是在flink程序里边,Flink的表环境里边注册的那张表,不管是输入还是输出,它都是在flink内存里面存在。而这里的表呢?这是真正的实体表。这是真的,在MYSQL里边就有一个叫做users的表,不管我们是读取它还是要写入到这里来,那显然这是在MYSQL里边真实存在的,我们应该是在MYSQL里边已经创建好,创建好的啊,那它跟这里的my table两者之间的关系只是在于它俩有一个连接关系而已,就他俩的数据是可以通过连接器去进行连接,去进行通信的。所以。
12:01
这里面要做一个具体的区分。另外呢,我们还可以连接到ES elastic search啊,那这个也非常简单,我们需要引入flink connector ES,把对应的连接器引入,这是ES6的版本,那如果是ES7的话,当然这是后面这个数字会有所不同。呃,另外呢,就是我们这里在写连接器表的时候,也只要把这个connector改成对应的elastic search啊,后面是版本六还是七要定义一下,然后后面主要就是一个。一般我们就是9200嘛啊,另外还有就是index关键的配置。这里注意,我们可以去定义主键,就在这里面啊,在当前的表里边primary key定义一个主键,所以在ES表连接到ES表的时候啊,向ES写入数据的时候,可以以更新插入的模式,Up模式向ES去写入数据。啊,那如果这里边我们没有定义主键的话,当然就不能了啊,就必须要有主件的定义,然后才能以更新插入模式写入数据。
13:07
当然了,同样还有这个h base的支持啊啊,那现在新版本的flink是支持了,直接给我们提供了一个h base的连接器啊,支持了面向h base集的读写操作,我们知道在大数据分析过程当中啊,这样的列式存储的数据,H base实还是应用常广泛,那它的使用也非常简单,我们直接引入link啊,应版本后面这里有这个版本需要去呃引入啊,目前Li是只对h1.4版本和2.2版本提供了支持。要稍微注意一下,不支持的版本的话,我们尽量还是呃,不要用对应的这个匹配啊,尽量要使用匹配支持的版本。当然了,呃,接下来在创建表的时候,创建连接器表的时候,这里边的定义也要有所不同啊。比如说我们知道在。
14:04
HP里边我们必须要去指定rookie啊,要指定这个family对吧?啊,那这里边需要注意的是这个rookie啊,名字不一定非要叫rookie,那rie是通过什么来指定出来的呢?那就是这里边啊,必须要有一个原子类型的字段,其他这些family都是肉类型,那只有一个原子类型,原子类型的字段就会被识别为h base里边的rookie啊,那当然了,后面我们还可以指定这个primary key啊,指定当前的这个主件,然后下边的连接器指定的时候呢。要指定H加上后面对应的版本,那我们可能还需要指定当前的表以及集群,这就是关于h base的使用。
我来说两句