00:00
关于连接到外部系统呢,最后我们再来单独讲一下flink跟hi的连接啊,那我们知道啊,阿帕奇hi它是一个基于哈杜的数据仓库的基础框架啊,那可以说现在啊,我们如果要进行海量数据的离线分析的话,Have基本上就是一个标配的核心组件了,那在have里边呢,我们知道它支持类CQ的查询语言啊,有时候我们把它叫做have CQ或者叫hiq啊,这样的话我们就可以非常方便的对于数据进行统计分析和处理。所以现在have可以说是分析存储啊海量数据集的唯一选择,那have的主要缺点呢,也非常的明显啊,它能处理的数据集非常的大,但是呢,查询的延迟比较高啊,所以一般情况下都是应用在离线分析的场景下边。而我们与之对比呢,Flink跟它就刚好相反,Flink的特点就是实时性特别的强,主要用在实时的流处理里面,哎,那所以如果说我们把flink CQ跟have如果能直接结合在一起的话,那就相当于可以直接使用一整套CQ,既可以获取到非常好的实时性,又可以进行超海量数据集的一个保存和分析了,诶,那能不能做到这一点呢?呃,现在的目标就是要做到这一点,Flink跟have就要进行一个集成。
01:21
他们的集成方式比较特别,所以我们要单独的拿出来说一下啊,呃,主要就是因为flink里边提供了一个叫做haveve catalog,就是所谓的haveve目录,哎,那如果说flink我们指定当前所使用的catalog是have catalog的话,那就允许我们使用have的mato来进行flink原数据的管理了,哎,那这样有什么好处呢?啊,那我们知道mato本身它是一个持久化的目录,哎,所以如果使用了have catalog的话,就可以跨会画去存储flink的原数据,也就是说如果我们在这个catalog里边创建了一张呃,连接到卡库卡的表,或者连接到ES的表,诶,那么他们的原数据就会持久化在ma store里。
02:06
对于不同的flink作业提交而言,哎,那就不需要再重复创建对应的这个连接器表了,哎,所有的那个表的信息都在当前的开lo里边,直接拿出来用就可以了。那另外还有一个呢,就是如果我们使用了have catalog的话,Flink就可以直接作为读写have表的替代分析引擎啊,因为我们知道have本身啊,它是可以引入其他的一些组件作为自己的分析引擎的啊,那一般我们常用的可能是Spark跟have的一个结合,那同样flink也可以作为一个替代分析引擎来跟have结合在一起,哎,那它的结合的方式就是通过have catalog。这样一来的话,在have当中我们进行批处理就会更加的高效,而且呢,也拥有了去读写流式数据的能力,哎,这种方式的结合,这就使得我们如果要是想要获取这个数据,仓库的实时分析应用就成为了一个现实啊。所以现在比较火爆的实时数仓的概念也正是基于这些特点发展出来的。
03:09
这里需要强调一点的是fli跟have的连接,目前只有blink版本的计化器planner才提供了对应的支持,诶,那所以我们在使用这个table API或者link CQ的时候呢,必须要选择当前的计化器是blink啊,当然了,我们现在版本默认的计化器不选择就使用的是blink,所以一般情况我们不要单独进行配置就可以了。好,那首先如果想要去跟have进行集成的话,还是先要去引入依赖,那have它的各版本特性变化比较大啊,所以目前link支持的have版本也是比较有限的,而且是分开的啊,就是have一点几的版本里面支持的是1.0~1.2.2,那二点几版本呢,呃,支持的是2.0~2.2.0 2.3.0~2.3.6啊,另外还有三点几的版本,支持从3.0~3.1.2。
04:04
啊,那所以这个还跟弗link之间的集成的,这个过程还不是特别的完善,我们现在也应该是在不停的发展变化的过程当中,我们可以随时关注官网发布的信息。然后因为我们考虑到have是基于哈杜op的组件,哎,所以我们使用have的连接,首先需要提供哈都相哈杜的相关支持啊,呃,这个主要就是在环境变量里边去配置一下海class pass,配置好了就可以了。接下来呢,在flink程序当中要引入flink跟hi的连接器相关的依赖,Flink connector have。除了连接器的依赖之外呢,还要引入have本身相关的依赖,大家就have ex e把对应的这些依赖引入,一般建议啊,不要把这些依赖直接打包到我们结果下文件当中,而是在运行时的,而是在运行的集群环境里边。哎,那我们可以被不同的have版本去添加不同的依赖支持,把对应的那些支持的类库啊,支持的包放在我们集群环境的那个library内部目录下边就可以了。
05:09
好,引入了相关依赖之后,那下一步就是可以直接连接到have了,我们说连接到have是通过在flink程序当中去设置表环境中的catalog,把catalog设置成have catalog通过这个来实现。这里我们可以看一个代码当中的具体应用案例啊,哎,那就是首先我们先创建一个当前的表执行环境,注意这个表环境里边必须使用的是blink的plan啊,默认就是这样啊。然后接下来呢,我们要去new一个have catalog,这个have catalog里边需要有几个参数,首先一个是当前目录catalog的名称,比方说我们就叫做my have,然后呢,要指定一个default database默认的数据库啊,我们就叫做my database。后面呢,还应该有一个have对应的配置项的目录啊,那当前这个have con director,把这三个参数传入之后,就创建出了一个have catalog的实例对象,接下来呢,我们就可以在当前的表环境当中去注册这个catalog,诶,所以我们应用的就是统一的啊注册目录的这个方法,Tablena去调一个register catalog,然后把当前我们定义好的这个have catalog注册成my head。
06:25
然后接下来我们就要使用我们注册好的my have作为当前会话的catalog,这样的话,接下来我们就可以跟have连接在一起去进行相应的CQ操作了。当然了,这是我们在这个flink代码当中啊,使用这个skyla语言在程序里边去设置了这个catalog,那我们也可以直接启动CQ客户端,在QQ客户端里边直接去create catlo,指定my have,然后用一个with子句指定它的type是什么,然后have对应的配置目录到底是什么,诶这样的话,我们同样可以创建一个have catalog,后边我们use这个catalog就可以了。
07:06
这是第一步,我们创建出了have catalog相当于创建了到have的连接。然后接下来呢,我们就可以在CTO当中啊,去创建一张连接到have的表,然后就可以去进行查询转换了,理论上是这样,但是我们知道啊,对于这个have而言,它是有单独的一整套查询语法的,它跟CQ比较相似,但是呢,又有很多自己独特的特点啊,那所以我们一般说它是类CQ的一种查询语言啊,我们把它叫做have CQ可以认为它是CQ的一种方言。所以这种方言我们在使用的过程当中,跟标准CQ或者说flink CQ的语法就会有一些出入,所以呢,为了提高兼容性,Flink CQ给我们提供的一个非常有趣而且强大的功能,那就是可以去设置所谓的CQ方言哦,就是说我可以指定当前我要用的这个语法呀,到底是have CQ的语法还是flink CQ啊这种标准CQ的语法,所以这样的话,我们就可以根据他们自己不同的语法去创建表,去执行一些查询操作啊,所以在这个当中设置也非常简单啊啊。
08:15
那最简单一种方式就是我们直接使用set命令,那就是table CQ-dialect指定成have,把当前的方言指定成have。呃,那另外呢,我们也可以在这个配置文件里边,CQ c Li defa.ya某里面通过configuration这个模块来配置table.cq dialect指定者态。这是在CQL当中的设置,我们也可以直接在这个代码啊,在Java或者skyva代码当中调用table API table env去获取当前的配置,Get con FA,然后去set CQ dialect指定成have就可以了啊,那除了have这个方言之外呢,另外还有一种方言就是default,其实我们知道default就是默认的原生标准CQ的这个方言啊。呃,我们可以认为这个default相对于方言而言就是普通话啊,我们平常用的这个Li CQ里边默认使用的都是标准CQ的语法。
09:13
好了,有了这些设置之后,接下来我们就可以去读写汉表了啊,有了这种方言的支持,那其实我们就会发现啊,就可以直接使用在have CQ当中的那种方式去创建一个连接到have的表达,所以我们看到首先可以把当前的方言设置成have,接下来呢,Create table have table,我们看到这里的语法就跟have CQ里边的写法是完全一样。这里我们指定了have表的对应的字段,而且设置了通过分区时间来进行触发提交这样一个策略,设置了这样一张表之后,哎,接下来呢,我们又可以把当前的CQ方言设置为default,也就是标准CQ的语法,然后去创建一个到卡夫卡的连接器表,所以我们就想到了现在就可以从卡夫卡里边去读取数据,然后进行查询转换,得到的结果呢,最终再写入到have table里面去。哎,所以我们看到这样的话,我们就可以把整个的流处理程序使用不同方言的表述全部用CQ表达出来了。
10:20
啊,这个过程就会非常的方便,非常的简单,这是关于到have的连接。
我来说两句