00:00
在table API和CQ这一部分的最后呢,我们再来详细的介绍一下怎么样连接到不同的外部系统啊,那其实前面我们在进行外部系统连接的时候,我们都知道啊,就是创建一张连接器表,然后指定不同的connector,指定不同的连接器就可以了。像前面最简单的其实就是我们在CQ client这一节里边啊提到过的,直接输出到控制台打印输出,那就是connector print,直接把这个指定就可以创建一张打印到控制台输出的连接器表了。那这个创建连接器表的最重要的一点其实就是使用create table这样的DDL语句,后边要加上一个with子句,然后把我们要创建的连接器名称传输在这里,那当然了,这个如果是控制台打印输出的话,只要有连接器就够了,别的没有任何的参数,那如果说实际使用的时候连接到了一个外部的数据库啊,或者说像卡夫卡这样的消息队列那。
01:00
很显然,我们还应该在谓语语句里边增加一些其他的配置参数,所以接下来呢,我们主要就是说一说在实际应用场景下,到底应该怎么样去写这个连接器。首先我们还是说一下卡夫卡,因为我们说卡夫卡这样的消息队列跟弗link这样的流处理器可以说是天生一对,他们之间的连接是非常的完美的啊,那接下来我们就看一看啊,在FNKCQ当中怎么样去创建一个连接到外部卡夫卡的连接器表啊,那这个过程呢,首先我们还是应该啊先去引入卡夫卡连接器的相关依赖,这个跟我们之前在data STEM API里边想要创建到卡夫卡连接的时候引入的那个连接器是完全一样的啊,所以之前如果我们引入过的话,现在也不需要再重复引入了,那就是flink connector卡夫卡。啊,那如果说我们回忆起来的话,之前在datat streamam API里边,我们去比方说要输出内容到卡夫卡里边的时候,那肯定这里边还是要去指定当前的所谓的这个sche码,要做序列化,反序列化,我们所使用当前它的这个格式到底是什么样子,那现在呢?呃,如果说没有对应的这样的一个格式指定的话,那就还需要再去引入一个所谓的表格式的工具啊,比如说你到底是CSV格式呢,还是Jason格式呢?啊啊,那所以这些表格式就定义了底层存储的二进制数据和我们对应的这张表的列之间的转换方式啊,那对于卡夫卡而言啊,像csv Jason a这些主要格式都是支持的,如果说我们想把这个卡夫卡里面读取,这读取出来的数据,按照CSV格式进行解析进行转换的话,诶,那这里边就需要引入flink csv这样的一个依赖。
02:46
引入了依赖之后,接下来我们就可以创建到卡夫塔的一张连接器表了啊,那其实我们看到啊,这个连接器表主要就是在创建表的DDL的with子句里边指定当前的connect是卡夫卡啊,那然后接下来呢,卡夫卡相关就有对应的一些配置参数了。首先我们需要指定当前连接到的卡夫卡的主题topic到底是什么?比如说我们这里就叫做events,那后面呢,很关键的还要指定当前服务器的名称BOO STEM service是local host9092啊,那或者是杜102的9092,这个根据我们具体卡卡集群的配置来进行设置就可以。另外呢,我们还有一些可选项,比如说可以设置当前的消费者组的ID,还可以选择当前扫描的起始模式,以及整个读取数据的表格式啊,那这里需要说明的是,我们在前面设置这个字段的时候呢,有一个TS,这是一个时间戳类型,我们看到它的类型是time stem3,而后边呢又有一个。
03:46
追加的补充描述叫做Meta data Meta data from time stamp,这是什么意思呢?诶,这就表示它是基于time stamp这样一个卡夫卡里边的meca data塔,也就是原数据去生成的一个新的列,专门生成的一个原数据列,那这里的这个time stamp呢,其实就是在卡夫卡里边数据可以自带设置一个时间戳,诶,那所以这样的时间戳我们就直接可以把它作为原数据提取出来,转换成一个新的时间字段啊,所以有了这个时间字段之后,它本身就是time stamp类型,那后边我们在表里边指定时间属性的时候就会更加容易了啊,就不需要再用其他的一些配置和提取了。
04:28
这就是关于卡夫卡连接器表的一个创建过程啊,那这个过程呢,我们会发现本来是非常简单的,但是如果仔细一思考的话就会发现,诶,如果从卡夫卡里面去读取数据,这样框架肯定没有问题啊,把它作为输入是没有问题的,那如果要是输出呢,输出的话就得考虑当前的更新模式啊,到底是什么样子,到底有没有更新查询,如果说我们当前输出的数据就是在这个结果表里边,只是一条一条往上追加的话,那肯定没问题吧,你设置一个topic,对于卡夫卡而言,就来一条写一条塞到这个消息队列里面就可以了,但是如果我们要有更新查询的话,那卡夫卡怎么知道之前的某一条数据要进行更改呢?
05:11
诶,所以这种连接器表只能应用在颈追加模式下边的输入表里啊,那如果说我们涉及到了更新操作的话,那就得用另外一种,就是所谓的up卡夫卡。这是flink专门为我们增加的一个卡夫卡的连接器,叫做更新插入,也就是UPS卡夫卡连接器,这个连接器呢,就不光支持紧追加的这种消力消息队列的插入数据啊,而且支持以更新插入的方式向卡夫卡的某个topic里边去写入数据。啊,那简单来讲的话就是呃,Up卡夫卡,它能够处理我们当前的更新日志流春之老,那既然它的编码方式是up的方式,所以在里边呢,就必须要指定单独的K,诶,所以当前我们读取和写入数据的时候啊,Topic里边的数据都是k value这样的一个键值,对我们针对这条数据的每一次更新插入操作,其实都是把对应的这个K上面的值做了一个覆盖而已啊,所以这样的话,你就不管是更新还是进行追加都可以执行相同的操作,那如果要是说想要删除一条数据的时候怎么办呢?也是可以的,就把当前的K直接指定成nu,就是如果遇到键为空的对应的数据的话,那么连接器就会把这条数据理解成为直接删除对应的内行数据,所以这样的话,就不论是读取数据还是把相应的数据要写入到卡夫卡的topic当中。也就是不管。
06:44
那我们当前实现的是table sauce还是table thinkk,最终都可以实现跟卡夫卡之间的连接了啊,那所以我们可以看一个具体的例子啊,这个例子呢,其实跟前面普通的这种应用也差不太多,诶,那主要就是这里的connector要变成一个up卡夫卡,诶,这个名字要改成up的卡卡,然后后面呢,同样要配置bookstrap service,那区别就在于这里还需要指定key和value的form ma格式,因为之前的话我们直接指定一个form ma就可以了,是针对卡夫卡里边每条数据去进行格式化定义的,而现在呢,那要针对key和value分别指定不同的格式化工具。
07:25
那把对应的这一张表连接表,连接器表创建出来之后,哎,那我们接下来如果读取数据,然后做了一个比方说PVUV的计算的话,那我们知道这其实要做一个count统计嘛,计算PV的时候,所有数据都要进行统计,所以我们可以直接count芯,那如果计算UV呢,要做一个去重啊,那CQ里面有非常简单的方式,我们直接distinct的UID就可以了啊,那所以这样的话就非常简单的可以得到当前PVUV的统计结果,但这个统计结果呢?啊,这个结果表里边有更新操作,所以如果我们想把这个值,这个结果里边的每一行数据写入到卡夫卡中的话,那我们创建的连接器表就只能是一个up卡夫卡的连接器表啊,所以上面这张是我们的输出表啊,那下面这个是我们的输入表,从这里去读取page views对应的这些事件,把所有的数据读取出来之后,经过这样的一个查询、聚合、转换得到的结果。
08:25
写入到page view per region这样的一个结果表当中,而这里输出的这一张阿卡夫卡连接器表呢,还有一个非常明显的特点,就是我们定义字段的时候会有一个primary key啊,那user region,那这里专门要定义primary key,就是要方便我们在进行写入操作的时候去指定当前的key啊,那根据这个K就可以执行更新插入操作。这就是关于up卡夫卡的用法,那除了卡夫卡之外呢,那还有一些其他常见的外部系统,我们需要去跟link进行连接,那比如说文件系统,这个之前我们都已经使用过了啊,那就是collector file system,那剩下的重要的配置呢,那一个就是pass,当前我们读取的文件路径,另外还有一个就是format,就是当前我们处理的文件格式。
09:17
文件系统非常的简单啊,呃,那在实际使用的过程当中,除了要跟卡夫卡消息队列进行连接,或者是跟文件系统啊,其实我们知道实际使用的时候,这个文件系统更多的应该应用的是一个分布式的文件系统啊,所以这里的这个pass可能给的就是HTFS的一个文件路径,这个使用是比较多的,另外呢,我们可能还需要涉及到跟其他的一些外部数据存储设备,一些数据库进行连接,比如说啊,那直接想要输出到MYSQL或者post graq这些JDBC类型的关型数据库里面,哎,那所以我们知道啊,Flink本身是可以提供JDBC连接器的,那就可以通过GDBC的driver啊驱动程序向任意的关系型数据库里边去做数据的读写啊,那所以另外一大类连接到外部系统的类型,就是所谓的GDBC啊,那整体来讲就是关系数据库了啊,像MySQL post gra SQ deerby,那这些数据库的连接其实使用的都是相同的方法,那这里。
10:17
我们需要引入的依赖呢,就是前面data three API里边提到过的flink connector gd bc这样一个连接器,那如果说为了连接到具体特定的数据库的话,比方说要连接到MYSQ,那还得引入MYSQ的连接器啊,MYSQL。那对应的版本的话,就根据我们自己的版本来选择就可以了。然后接下来在CQL当中怎么样去创建这样一个连接器表呢?啊,那同样还是create table,然后后边需要有一个with子句去指定当前的连接器啊,那我们的connector指定的是JDBC,下面有一个URL,这个必须要指定我们具体连接的那个路径啊,我们现在是连接到MYQ啊,这是local host 3306,具体的数据库是my database下面呢,还有对应我们要连接到这个表名啊,这里我们需要注意的是当前的这个表名跟我们创建的create table的这个表名这是两回事,这完全不同,我们这里的d dl create table创建的这个my table,这个名称呢,其实是存在于flink内部的表环境里的,这是在flink里边涉及到的一张表,而后边的这个table name呢,这是真正在外边MYCQ里边存在的一张user表。
11:31
哎,那所以相当于我们是把这个外部的实体表user表跟内部表环境里边的这个my table做了一个连接啊,做了一个匹配连接的关系,他们俩的名字完全可以不同。然后我们看到在创建这张表指定它的这个字段的时候呢,有一个primary key的定义,定义了主键之后,那么接下来所有数据如果要写入这个my table的时候,使用的模式就是。更新插入的模式就是up模式,因为有key嘛,所以我们针对这张表的每一次写入操作都可以把它理解成是up,就是可以是插入,也可以是直接更新,这些都是可以根据当前的K去直接进行判断的。
12:13
这就是连接到GDBC的过程,那同样道理呢,我们也可以直接连接到elastic search啊,我们知道ES作为一个分布式的搜索引擎,在具体使用的过程当中啊,作为我们数据处理结果的输出还是非常常见的诶,那所以这里边如果我们想要把数据写入到ES里边的话,那同样先要引入flink跟ES的官方连接器啊,那这里边我们引入的就是flink connector search6啊,那这是ES6版本啊,那如果是七的话,当然就是SEARCH7了,这个依赖的引入跟之前data threepi里边也是完全一样。然后接下来如果创建这个连接器表的时候呢?诶,那关键就在于connector要指定成elastic search,比方说七,然后接下来要指定当前的host,我们连接的这个服务器集群,然后接下来还有一个index,当前的这个索引要定义好,同样我们在创建表DDL里边要指定primary key,指定当前的键是什么,这样的话就可以以更新插入UPS模式向ES里面去写入数据了。
13:19
那最后呢,我们再来介绍一个比较特别的,那就是HP啊,其实我们知道HP呢是一个非常重要的分布式的列存储的数据库啊,在大数据应用里面也是非常的广泛,那弗link呢,早些时候的版本是不支持像h base直接写入数据的,包括我们现在知道这个电SAPI里面啊,也没有h base的官连接器,但是呢,Link CQ啊,Table a里面提供了官方的连接器啊,但是呢,目前支持的这个版本相对来讲会比较局限,目前只支持1.4和2.2版本的h base有连接器的支持,诶,那所以这里边我们引入的依赖啊,就是flink connector h base后面指定是1.4还是2.2啊,后续的话如果进行完善,可能就会有更加通用的连接器出现,我们现在的话就必须得用相应的版本才可以,那我们看一看这个在CQ里面到底怎么创建对应的连接器表吧,啊,那同样还是create table在DDL里。
14:19
边诶,我们知道对于h bases呢,它需要去指定当前的RK和family啊,那我们看到啊,所有这个family它的类型都是肉类型,那RK是什么呢?呃,肉K我们看到啊,它的这个名称字段名称不一定非得就叫做rock key,它是使用我们当前这个表的所有字段当中一个原子数据类型的字段就会默认指定为当前的rookie啊,那当然了,就是这个rookie啊,作为当前的主件的。接下来呢,With句里边指定connector是h base,后边有相应的版本,然后指定当前的table name,另外还可以指定keep,这就是h base这样一个连接器表的创建过程。
我来说两句