00:00
目前为止,我们已经了解了table API和link CQ当中几乎所有的用法,哎,我们其实会发现啊,只要使用了这套API,那就相当于我们不需要考虑更多更复杂的link底层流处理的情况,而是把它看成一个传统的关系数据库,我们创建表,然后基于它去进行CQ查询转换,直接写CQ就可以完成底层的流处理程序了。诶,那所以就为很多传统的数据工程师,数据分析师提供了非常大的便利啊,因为我们知道啊,很多数据工程师其实可能对于编程语言Java scalela编程语言并不是特别的熟悉,他最熟悉的呢,就是对于数据库的操作就是CQ,诶,那所以这种方式就为他们提供了很大的方便。但是我们发现啊,即使是这种方式,整个这个代码我们还是要放在一个Java的类,或者是scale拉的对象object当中的,那在这个写入的过程当中,CQ,尽管大部分流程我们都是以CQ去实现的,这个CQ呢,还是要内嵌在我们底层的一个方法调用当中,诶,所以最后写完的程序我们还要进行打包,然后提交到集群上才能够真正的运行起来。
01:13
我们在就想到了,那有没有更加方便的一种方式,就不要跟编程语言有关系了,诶,我们直接就像之前操作这个数据库一样,打开一个页面,直接写一句CQ,就可以把我们定义的这个操作流程提交到集群上去执行呢,确实是有这样的方式的啊,在flink当中就给我们提供了一个非常好用的也非常好玩的小工具,就叫做CQ客户端。那这个CQ客户端呢,其实就是一个命令行的交互界面啊,所谓的这个c Li,我们可以在里边非常容易的去写各种CQ查询,就像使用MYCQ一样啊,就像打开MYCQ的那个交互页面一样啊,诶,那这里边就相当于我们可以直接抛开Java或者skyla这样的代码的编写,直接就在这个命令行工具里边去输入一行一行的CQ,就可以把我们所有的流处理作业的逻辑定义出来了啊,那接下来我们可以简单的介绍一下到底怎么用这个客户端,想要使用CQ客户端呢?诶,那首先我们需要启动本地的集群环境啊,先把这个flink集群要先提起来,然后接下来呢?啊,启动CQ客户端的命令非常简单,就是使用并下面的CQ client这样一个启动脚本啊,这个我们之前在flink本身的安装目录下边啊,它里边本身有一个并目录,这个下边呢,就有很多我们可以去执行的脚本命令,最常用的。
02:39
当然就是这里的flink了,我们如果要想去提交一个作业的话,直接flink wrong就可以,呃,那除了这个之外呢,还有一些我们其实大概也知道它是干什么的啊,比如说哎,这里有雅安session,基于这个雅恩的部署提交,那那还有这个单独的task manager job manager啊,以及stand job等等啊,单作业的提交模式,这些都比较简单,我们现在要用到的呢,是一个CQ的客户端,所以其实就是这里的CQ client.sh啊,所以在这里可以直接调用它。
03:11
我们看现在其实我并没有在本地起集群,所以这个客户端呢,它是可以单独运行的,但是你起来之后啊,在里边想要去提交作业的时候,那当然就得有集群环境了啊,这个相当于只是我们提交作业的一个渠道而已,我们看提起来之后啊,这里就有一个非常大的松鼠logo啊,弗林克的这只小松鼠就体现在这儿了,然后这里写着它的名字叫弗link CQ client啊,当然现在还是一个better版,还是一个测试版本,所以现在还不是特别的完善啊,未来的版本里边这个功能肯定会越来越完善。接下来呢,我们就看到了有一个这样的提示符,说明进入到了一个交互式的命令行界面来啊,那么在里边呢,这个操作就跟我们进入到MYSQL的命令行界面是一样的啊,比如说我们敲一个set,就可以看到当前环境里边的一些参数设置啊,我们看这里边最重要的设置呢,其实都是跟当前的弗link集群有关的。
04:06
默认情况下啊,LIC其实就是读取我们集群配置文件里边flink com.ya某那个配置文件里边的数据,然后进行加载的,比如说我们看到这里边啊,有这个job manager的memory,对应的这个配置有task manager memory process size啊,对应的配置都有,另外还有job manager rpc,它的主机名和端口号,我们配置的是韩度102和6123。然后下面还有两个非常熟悉的经典配置,一个是集群的默认并行度parallelism.default默认是一,另外还有一个每个task manager默认的slots数量number of task slos默认是一啊,那所以这里面如果想要做其他更多的环境变量的设置的话,也可以在这里使用set命令去做一个配置啊。那这里我们可以简单的说一下,最重要的这个需要配置的环境是什么呢?呃,其实主要是运行模式。
05:00
设置的这一项就叫做execution runtime mode啊,这也就是说我们到底是以流处理模式还是批处理模式去进行运行啊,类似的对应着我们就是提交作业时候啊,指定的那个运行模式的参数啊,那所以在这儿我们可以直接用一个set命令来做一个设置,默认情况下当然就是流处理模式了啊,就是streaming,如果说想要使用批处理模式的话,把这个改成batch就可以了。然后除了这个运行模式之外呢,还有一个执行结果模式,所谓执行结果模式其实就是在这个CQ客户端上面啊,我们做一条查询之后,直接就可以拿到它的返回结果,那这个返回结果以什么样的形式来表现呢?啊,那最经典的方式当然就是table了啊,那table就是直接返回的,就是以逗号分格的每个字段我们查询出来的结果都输出出来,然后还有changelo changelo我们就更加熟悉了啊,像之前我们在控制台打印输出的时候啊,直接to changelo stream转换成更新日志流,那在这个输出的数据里边,每一行数据前面都会加上一个啊,表示到底是插入还是删除或者撤回这样的一个标记,一个符号,哎,那所以就是数据前面会有一个加加减减这样的前缀啊。
06:12
除此之外呢,另外还有一个非常经典的模式,那就是所谓的table AU table AU就是我们在这个MYSQL里边经典的那种可视化模式啊,输出的结果呢是一张表,它是以这个虚线框直接把它框起来的,然后上面是我们的这个表头的属性名,然后下面每一行的数据都列在里面,这就是所谓的执行结果模式,我们配置的时候是c client execution.result-mode后边加一个引号把它引起来啊,指明到底是table还是lo还是table AU就可以了。哎,那除此之外呢,还有一些非常典型的配置项,比如说之前我们说过的啊,状态生存时间TTL啊,就是这个状态啊,如果空闲多长时间之后就要清理掉了,为了释放系统的资源啊,那这个时候呢,就可以配一个TTL配置的选项,就是table.ex e.state.ttl里边配置一个时间,一个毫秒数。
07:09
啊,这里其实我们能配置的选项特别特别的多啊,那其他的内容呢,我们可以到官网上去做一个详细的查询啊,那除了在命令行里边可以直接用set去做一个配置之外,我们也可以更改CQ客户端的配置文件啊,这个配置文件就叫做CQ c Li default.yama也是一个yama文件啊,那在这个压ma文件里边呢,可以配置的东西就会更多更强大,甚至我们可以在里边直接去预定义表和函数,也就是说像我们连接到外部系统的那些连接器表,一开始就可以在压录文件里面直接创建出来,这样的话,我们在CQ客户端里面就直接去执行对应的查询语句定义处理逻辑就可以了。所以已经有了基本的一些配置项和预定义的表,那接下来我们就可以去执行CQ查询了啊,这里的操作其实就跟my CQ Oracle这些关系数据库完全一样了,比如说啊,就是像之前我们那个最简单的进行聚合的例子啊,首先我们读取文本文件里边的数据,创建一个连接器表,那么在CQ客户端里面同样可以执行这样一句d dl create table啊,比方说我们这个叫做英文table里面对应的这个字段定义出来time Sam,因为是一个关键字,所以我们加了反引号,然后后边一个with连接器表,Connector file system,从文件里边去读取当前的数据,读取出来之后呢?啊,那相当于现在就已经有了一个even table了,那接下来当然就是可以去直接执行对应的查询操作啊,那在这之前呢,我们可以再去创建一个输出的表,那这个输出的表同样也是一个连接器表,我们看现在是连接器connector是print啊,那这个所谓的。
08:53
连接器表其实就变成了一个控制台打印,最后的输出结果是在控制台打印的,那最后我们执行的操作呢,其实就是要select我们想要的字段,比方说user,然后看看URL,统计一下每个用户他点击访问URL的次数,从这张表里边取出来,而且按照user去做一个group,做一个分组,得到的结果呢,直接insert into result table,哎,这个过程其实跟我们就是在MYSQL里面的操作可以说是完全一样。
09:24
这就为一些不太熟悉Java scla这样的编程语言的数据工程师提供了非常大的方便啊,我们就直接在里边写CQ就可以了,这里需要多注意的一句是,就是如果说我们在这里执行了一条这个CQ的话,那其实本质上来讲是在集群上提交了一个flink作业的,那这个操作可能会比较轻量级,那导致我们频繁的提交作业就有可能出现一些问题,所以现在呢,Fli CQ还是把这个客户端作为一个测试或者说实验的这样一个场景啊,那具体使用的过程当中,一般我们还是需要把所有的代码写好之后,然后打包提交到集群上面去执行的啊,用这种方式呢,我们可能就是做一些非常简短的测试啊,把逻辑跑通就可以了,这就是关于link CQ的客户端。
我来说两句