00:00
另外还有一个连接到的外部系统,那就是大名鼎鼎的hi啊,那我们知道hive其实它是一个数据仓库啊,它是基于哈杜op的这样一个数仓的海量数据存储的技术框架啊,呃,那当然了,它主要不光是提供数据的存储,关键还是在于对于海量存储起来的数据进行统计、分析、计算啊,那可以说现在它已经成了我们当前大数据离线分析的一个核心组件。那么它里边既然是离线分析吧,呃,批量数据,那所以它当然是支持CQ的啊,我们知道在have里面它有一套自己的CQ语法。跟标准的CQ有类似的地方,也有它独特的地方,那那我们知道当前如果flink要跟hi去进行结合的话,很显然flink CQ。直接跟hive进行结合就是最方便的啊,那所以它的这种集成的方式会比较特别,我们知道在底层其实data threepi是不提供直接到hive的连接器的啊,那我们现在如果要到have去直接连接的话,怎么去连接呢?啊,那他在上层这个linkq里边是提供了一个have catallo功能的,也就是说是提供了have目录的一个保存,它可以使用have的to来管理当前flink CQ里边的原数据。
01:26
哎,那这样的一个好处,其实我们就会发现了这个have的mesto,它其实可以看成是一个持久化的目录嘛,因为我们知道这个mesto是单独进行存储的,我们可以进行配置,所以假如说我们现在使用了这个catalog的话,它里面的所有数据都是直接持久化了,那就可以绘画去进行调用了啊,就假如说我们里边创建了一个,呃,比方说连接到卡普卡,或者连接到ES的一张表的话,接下来。它的原数据都存储在这个catalog里边啊,那就相当于是在masto里边存放了,那么对于不同的flink作业,我们写这个不同的flink CQ啊,提交不同不同的flink作业的时候,那就不需要重复创建对应的这个表了。
02:12
相当于直接就拿来用就可以了,对吧,所以这个在特别是如果说我们直接使用CQ client的客户端去提交的时候,就会特别特别的方便,你前面那个每次提交作业的时候,我们的那个create表的那个过程啊,连接器表的那个定义的过程就可以省略了。啊,那另外呢,使用have catalog还可以直接使用flink作为读写have表的替代分析引擎啊,因为我们知道haveve本身啊,它是一个离线的数据仓库,那最多的这种方式我们可以直接去,呃,Have里面直接写CQ啊,写写这个have CQ去进行查询处理,那更多的呢,是把Spark作为have的一个替代分析引擎啊,那Spark跟have结合起来去进行批处理查询转换。
03:02
那我们自然想到了Spark是一个数据大数据的分析引擎,FNK也是啊,Spark能做的事情fli也都可以去做,所以能不能把have的分析引擎替换成flink呢?诶,当然也是可以的,那所以只要flink里边我们使用了have catalog的话,接下来其实就可以直接把flink。当成处理have表的一个替代分析引擎啊,这样我们在have里边进行批处理会更加高效,那另外呢,我们也就可以在have当中进行读写数据进行流处理了啊,就把这种能力给我们提供出来了,这也使得就是所谓的实时数仓啊,这种成为了可能啊,当然了,现阶段可能更多情况下,对于实数数仓的处理可能更多的是使用塔卡进行大规模数据的存放啊。而我们现在呢?如果说想要用flink和hi的连接,至少是提供了这种可能。
04:05
Have catalog,它是被设计成所谓的开箱即用的,跟现有的have,所有的这个have的配置完全兼容。这里面主要需要注意的是,目前只有。阿里内部给我们开源出来的blink计化器提供了hive的支持啊,那所以如果说呃,我们想要用hive的连接这个功能的话,必须使用blink planner啊,当然了,未来版本里边我们老版本的计化器也要被弃用了,默认就是bli plan,这个也不是什么问题了。接下来我们具体看一看怎么样去做弗link和汉的集成。那首先就是需要去引入依赖了,我们知道汉有各版本,它的特性变化是比较比较大的啊,那这里边呢,Flink支持的ha版本也是有限的,不是所有都支持,那么它支持的版本这里都列出来了啊,就是一点几的话支持。1.0.0~1.2.2啊,那二点几的话,有这么一些版本是支持的,三点几啊,也有对应的支持版本,目前呢,这一部分还不够完善,那么同样它也是在快速的完善加强变化的过程当中,支持的版本信息可能也在不停的调整啊,所以随着flink版本的更新变化,我们需要不停的去关注,然后。
05:21
去检查自己当前的这个项目当中啊,使用的版本到底是否支持啊。那have我们知道它是基于哈op的组件嘛,所以我们首先你如果要跟have去进行结合的话,首先得有哈杜op的相关,哈杜op的相关支持啊,那比如说这个哈杜op相关的这个plus呀,一些这个环境变量,我们首先是应该要有的,另外呢,呃,可以引入以下的一些这个依赖啊,这里边需要去引入当前的flink connector这样一个连接器啊,那当前这个连接器就是只提供了CQ。这样一个API层级的对应的集成支持啊,那另外呢,除了连接器的支持,还应该把hive的依赖也引入,那就是hive exec啊,对应的hive版本放在这。
06:09
比较建议的是这些依赖最最好是不要直接打包到我们的那个提交的flink作业对应的那个状文件里边啊,那在运行时的集群环境里边,我们再去添加这个依赖支持会更好一点。啊,那有了这个依赖支持之后,接下来我们就可以直接连接到havelo啊,那注意连接到have的操作呢,是要配置have catalog,我们看一下have catalog啊,这里边怎么去做配置,关键就是我们默认的时候default database啊,是叫my database对吧?啊,那这里边我们先定一个hive的配置的目录啊,我们先把它定义出来,然后接下来是要new一个hive catalog,只要我们引入了have连接器的话,就可以使用当前havelo这个类创建一个对应的对象。那接下来这个have catallo怎么去用呢?在表环境里边注册一个catlo。
07:04
那所以我们当前其实是。基于你看我们传入的这个参数啊,是要有当前的name,首先有一个当前这个catalog的name,然后呢,呃,当前这个。对应的数据库,当前默认的这个数据库的名称,然后还有。对应的haveve的配置,配置信息的目录,把这三个传入,就可以创建出have目录,Have catallo,然后这个havelo用法就是在环表环境中直接注册一个catalog。注册完成之后,接下来我们可以把它作为当前flink代码,当前绘画里边所使用的目录catalog,那这个就是use catalog,我们也都知道use catalog use database可以进行数据库和目录catalog的切换转换。那这是在代码当中的一个配置啊,当然了,我们也可以在s client客户端里面去做这样的配置,就是直接我们可以在这儿啊,直接create catallo啊,然后MY啊,定义一个这个名称,然后with对应的一些参数,这个看起来可能会更加的简单一些啊,相当于是在这个呃关系型数据库里边直接去创建一个目录的这样一个操作就可以把它完成了。
08:25
之后就可以use catalog就可以。首先把这个catalog。注册完成之后,就相当于进行了一个到have的连接,那接下来之后我们怎么样去基于这个have去写入数据,读写数据呢?呃,那这里边首先我们要去介绍的是一个方言啊,就CQ方言,对于flink CQ而言,它有一个比较有趣的功能,就是可以设置当前的方言。什么叫方言呢?因为我们知道haveve里边的这个CQ啊,它的语法跟标准CQ是不一样的啊,所以说提供的这个have CQ,或者有时候我们把它叫做Q啊。
09:07
它相当于是CQ的一种方言啊,那所以如果说我们使用这个flink cqlink CQ基本上是跟标准CQ完全一样的,那跟have的操作它有所不同。为了让我们更加的方便,为了在集成hi的时候更加的兼容,那么link CQ它就提供了这样一个方言的写用,使用方言去编写CQL查询的这样一个功能啊。也就是说,如果说我们把这个方言设置成have的方言的话,Have CQ方言的话,接下来我们可以在代码当中写CQ,直接写have CQ就可以了。啊,那所以接下来我们可以看到怎么样去做这样的一个设置呢?啊,那可以直接把这个table CQ。这个属性设置成就可以了啊,这当然这个直接的话,这个我们是在呃q client客户端里边去执行的,诶,那如果说我们想要在代码里边去执行的话,也可以直接配置当前的表环境的配置项啊,直接把这个配置好就可以了,等于氦啊,那另外呢,我们也可以在。
10:18
当前CQ客户端的配置文件里边,通过它的configuration模块来进行一个设置,当然这一个配置文件主要就是针对当前的CQ客户端来生效的啊,那甚至在这个配置文件里边,我们还可以去配置一些比方说初始化的表的信息啊,所以有一种使用的方式就是在配置文件里边把我们想要创建的那些连接器表都就定义好,定义好了之后,那相当于接下来我们就连在CQ客户端里边去写DDL去创建表的这个过程都省了,诶那就直接在里边写CQ就可以了。那当然了,目前我们还主要是用这种方式测试啊,所以这个我们就直接跳过了,然后另外呢,我们看到这是CQ里面去做设置,另外table API啊,那其实就是在代码里边啊,调这个get得到对应当前表环境的配置项之后,可以直接调一个方法set CQ dialect去配置当前的。
11:18
方言啊,那我们知道方言主要有两种选择,一个是have,这是have方言,另外一个是default default当然就是默认的linkq的这种标准CQ语法的方言了。配置了这个之后,接下来我们就可以去读写have表了啊,因为有了方言设置,那其实这个对have表的操作就非常的简单了啊,比如说现在我们就直接考虑直接用CQ的形式啊,来进行这样的一个书写,我们设置首先设置当前的方言是汉,那么接下来我们就可以操作一张表了。首先这张表应该是在have当中已经存在的,那么接下来我们做的操作其实就是直接在have当中,因为我们当前的catalog已经定义成了have嘛,已经自定义new了一个have catalog哎,那对应的我们当前创建表的操作就可以直接在have里面去进行了。
12:12
那所以这里边我们的这个语法就完全都是have里边创建表的这种方式,所以我们看到可以啊,然后后边这个start as啊啊,那那这个。Has啊,那后面做相关的一些配置项这样的一些定义,然后呢,把当前这张表创建好了之后,有了当中的表了,然后接下来再把CQ方言设成default。现在转而使用flink SCO当中的语法去创建一张卡夫卡的连接器表啊,那当然了,当前我们创建出来这两张表之后呢,就可以从卡夫卡里边读取数据,然后经过查询转换,比方说我们这里边select各种各样的字段啊,进行这个data form format做一个转换,From卡table,把它查询出来之后,直接insert into table table。
13:07
诶,那这样的话就相当于把我们经过flink,首先是从卡夫卡里边去读取数据,利用我们这里的连接器表,然后呢,Select这里边是flink进行转换处理得到的结果表,直接写入到我们创建好的这张have表里面去。诶,这就是一个读写have表的一个转换过程。所以我们发现整个这个过程呢,它其实本质上是一个流处理的flink CQ的程序,它其实还是来一条,呃,卡夫卡这边来一条数据,就经过这里的持续查询,经得到动态表里边的一行新的数据进行插入,然后直接就把它写入到have里面去。就是整个就是这样的一个处理流程。所以flink CQ连接到外部系统的方式是多种多样的啊,那后续可能还会支持更多的不同的外部存储系统,我们可以实时的关心。
14:07
Flink版本的更新和官网的说明。
我来说两句