00:00
已经创建好表之后,那接下来当然就可以对表进行处理计算了,我们对于表的处理方式,那一般就是写一个CQ,然后进行查询转换,我们知道CQ查询得到的结果可以看成是一个新的表啊,所以当前我们对于表的查询转换。可以认为就是从一个table到另外一个table的转换过程,它就对应着之前我们讲的data API里边的这样一个过程。那flink对我们为我们提供了两种不同的查询方式,一种就是直接执行CQ,另外一种呢,就是调用table API,使用链式调用方法的这种过程,然后实现table的转换。我们最为熟悉的当然就是直接执行CQ了,前面我们也已经做了比较简单的事例的实现,那我们知道flink它底层其实是基于k set提供了对Q的支持,Set是一个提供了标准CQL查询的底层工具啊,很多大数据的框架,比如说have,呃,它的底层对于CQ的支持都是通过集成了来实现。
01:14
所以flink也不例外,只要集成了set,那当前我们就可以直接去写标准的CQL语法进行查询转换了,那在代码当中调用呢,就是直接把CQ写成一个字符串,字符串表示出来,然后去调用table env,也就是table environment表环境的CQ query方法,把当前的CQ这个字符串进去,去执行就可以了。得到的结果是一个cable对象,这里需要注意的是,这里调用的方法跟前面创建表的时候调的方法不一样,创建表的时候我们执行是要执行一个DDL,所以它是execute CQ,它关键点在于执行并不需要我们返回任何的结果啊,最终相当于创建了一张表,连接到了外部系统,而且在当前环境里面做了一个注册,它是这样的一个效果。
02:09
而CQ query呢,有所不同,它是要真正的定义我们的计算流程、计算逻辑,然后得到一个新的table对象,所以这两个方法是不一样,我们做查询转换,调用的是CQ。那目前呢?Flink支持标准CQ中绝大部分的用法都已经支持了,而且提供了非常丰富的计算函数。所以。这样的话,我们的技术站啊就可以直接迁移过来了,之前呃,对于数据处理或者是大数据处理都非常熟悉的那些cqu的用法,我们就像之前在操作MYCQ操作一样,直接就可以上手写CQ,用CQ实现我们的业务处理逻辑,然后直接去执行就可以了,所以这样做的话就大大的降低了flink的上手难度。
03:01
所以呃,当前在很多的实际项目当中,很多公司的应用当中,都是直接上手去写CQ,实现基本的功能的。基本上就没有技术迁移的代价。C本身是有非常丰富的用法和函数的,那flink现在还在不停的完善,不停的丰富,相信在未来flink的新版本里边一定会对CQ的用法支持会更加的完善啊。那目前除了这种简单的做这个select from这样的一个简单的查询提取之外,我们当然也可以用一些对应的CQ里的数进行分组聚合啊,这个就涉及到group,比方说我们可以直接count去记一个数,然后做一个就是定义,按照当前的user啊去做一个分组,然后去做一个count计数,这样的话得到的结果就相当于做了一个分组聚合统计,我们也可以用S啊,那或者也可以用avg计算平均数这样的一些方法,这样就可以非常快速的实现一些基本需求了。
04:10
那在上面的例子当中,我们得到的是一个新的table对象嘛,啊,那接下来得到了一个新的table,那我们就可以继续去转换,也可以把它直接写入到我们对应已经创建好的一个连接器表里面,那就相当于输出到外部系统了啊,那输出外部系统的时候,之前我们也已经在程序架构里边提到过,可以直接去执行一个execute insert这样的一个语句,这样的话我们就相当于把一个表。和环境中定义好的一个虚拟表进行了连接,把这个table里边对应的数据写入到了当前的表里面去,那就对应的写入到了外部系统了。这是一种方式,另外一种方式呢?呃,我们自然想到了,也可以直接去执行一条CQ,执行一句insert不就完了吗?呃,之前我们可以执行create,当前当然也可以执行insert,所以execute和execute insert可以认为是近似于相等的两种执行方式,只不过如果是execute的话,我们就要完整的写出insert into这样一个语法,然后加上我们当前要写入的对应的那个表的名称,而如果是调用execute insert的话,那就直接传入一个当前表的名称就够了。当然前面得到的表必须是相当于我们把这个完整的,这个表要要得到注册到环境里边才能够执行调用这个SQ的音色。
05:45
这里还要多说的一句是,如果说我们直接在这儿去执行exte insert的话,那其实我们最后之前我们说这个流失处理环境里边,最后是要有一个Env.exequte把它执行起来的,好,那我们就会发现了,这是执行,那相当于我们这里也是执行啊,所以如果说我们直接使用了这种方式去输出到外部系统的话,最后的Env.execute也可以省略。
06:16
因为在当前我们定义这个环境的过程当中,有可能就根本没有用到当前的流执行环境嘛,EV就不存在,所以当然也就不需要再去把它执行起来了。这是相当于table API又给我们省略了一步,整个的过程当中,我们就只要写DDLCQ就可以了。这是直接写CQ进行表的查询转换的方式。当然另外还有一种方法,那就是去调用table API了,调用table API的话,它是嵌入在Java和SKY语言里边的一套查询API,核心呢,当然就是table接口类了啊,那这个类里边我们创建出来的对象就可以一步一步的链式调用它对应的方法,每一步每一次调用方法。
07:06
得到的结果都是一个新的table啊,那简单的方式我们看到了,就是你得到了一个一个table,然后接下来那就可以点啊,然后这里边我们写一个对应的表达式啊,这里边的查询条件user is e al啊,然后下面select,当然就是提取对应的字段了,当然我们也可以做,呃,进一步的做一些聚合呀,甚至后面我们会讲到有开窗啊,各种各样的操作都是支持的。这里需要说明的是,我们可以直接基于当前的表环境,然后调它的点from方法。直接从当前环境里边注册的一张表里边接获取到一个table的对象啊,所以这就现在我们就知道了,表环境里边注册的表和当前我们代码里边的Java对象,Table对象,它们俩之间是可以互相转换的。
08:06
如果我们先得到了table对应的对象的话,我们可以调用表环境的create temporary去创建一个虚拟虚拟视图,然后注册到当前环境里面。那如果说我们已经在环境里边注册这张表,现在想把它拿出来得到一个table对象的话,那我们就直接调用table en的from方法,里边进来的就是我们注册在环境里的名称,这样的话我们就可以直接得到对应的table。对象啊,那所以在之前的代码里边,我们现在注册了一个。叫做click table的这样的一个一张表,那后边如果说我们想把它拿出来的话,也可以直接table en去掉from。然后这里边我们直接传入这个click table,这样的话得到的就是。一个table的具体对象,那这里面的名称我们可以跟这个click table一致,也可以自己给一个别的名字,我们知道他们俩是一回事就可以了。区别就在于一个是Java对象,Table对象,一个是真正注册,注册在表环境里的表。
09:15
这就是我们所说的它俩之间的转换啊,那当然我们说的这个基于click table,它可以做各种各样的转换,我们可以定义这个where。Dollar符啊,然后写一个这样的一个表达式里边,比方说我们这个是username。我们需要把它引入expression那个类要引入啊,然后我们可以让它某一个用户,比方说这个,那接下来我们还可以继续去做这个select。当然我们看到如果说是先做了where,然后再去做select的话,呃,这个当前我们这里边给给出来的是一个已经被弃用的这个状态啊,那当前是因为我们只传入了一个string这种定义表达式的方式,我们看可以就是直接用逗号分割啊,把所有的这个字段传进去,这个是可以的,但这种方式已经被弃用了,一般推荐的这个表达式当然是用Dollar这种方式去定义了。
10:16
这是当前推荐的定义,呃,就是使用表达式的方式啊,直接传一个string,这种方式不推荐。这里我们可以选择当前的username以及URL。就是比较简单的这样的一个转换,我们可以把它叫做。Or table?啊,那后边如果说我们还想把这个table再去放到当前的环境里边,注册到环境里边,直接用在后续的CQ里边的话,诶,那也可以在这里我们直接做一个。做一个注册table,那就是create。Temporary方。
11:03
后边是这就是我们能看到的这样的一个表。表的table对象,以及当前注册好的表他们之间的转换的过程。当然这个过程我们也用到了table API的一些调用。那另外的一种方法当然是直接去写CQ,我们也可以把这个CQ完整的做一个书写。是,呃,执行CQ。进行。表的查询转换。上面这种方法我们是。调用table API进行。表的查询转换。那接下来我们可以直接干脆我们就直接针对这个result table再去做一个转换就可以了啊,那我们就看到既然已经注册完了吧,那我们现在就可以直接用这个这张表啊,所以接下来我们可以直接定义这样的一个,直接写这个吧,那就table d直接去query。
12:09
里边我们完整的写一句select。当比方说我们现在要提取当前的。Username。我们就直接提取username。Result。这样当然是可以的啊,直接一执行就可以把对应的数据全部提取出来,如果我们觉得这个数据太少的话,当然也可以。URL,然后userna直接放在这里,可以得到它了。对应的这个结果就是一个新的table,比方说我们可以把它得到这个。二。这样的话,我们就通过前面的一一层查询转换,后边又直接把它得到的结果注册在当前环境里边,又直接写了一句CQ,把它做了一个查询转换。
13:00
如果说我们想要看到当前的结果的话,我们当然是可以直接把它。添加到当前的这个输出表里边,Out table里面啊,那我们看一下这个添加的这个过程应该怎么样去输出呢。接下来就是。输出表。非常简单直接,我们可以把result table2去调用一个execute音色的这样一个方法啊,那这里面我们传入一个当前表的名称叫做。啊,那当然了,当前的这张表本身应该不是URL和USERNAME2个字段,那如果是两个字段的话,我们应该用前面的这个result table啊,那那所以这里边如果要是说我们想要插入,就是当前是要把这个对应字段。两个字段的这一这个表想要插到这个out table里边的话,那我们显然应该用的是result table,而不是二啊,这样的话,当前的这个属性字段才定义是匹配的,否则的话就会报错了。
14:06
呃,当然了,我们现在直接这段代码要想执行的话,可能还会有问题,因为我们前面定义了太多对应的这个表环境,而且我们知道老版本对应的这个old planner我们是没有引入的,我们只引入了blink planner啊,那所以我们要执行后面做测试的话,就把前面定义的其他。其他版本的。环境,执行环境表环境,我们直接掉,只用当前的blink版本的流处理环境就可以了。当然这段代码如果说我们想要真正的运行起来的话,还有很多细节需要注意,比如说CQ1定要认真的检查,因为我们知道CQ本身在这里表现出来的只是一个string字符串而已,那如果说CQ语法有错,Idea是不会给我们任何提示的,只有在运行的时候才会报报错报出来啊。那比如说这里我们就有一个非常简单也是比较容易犯的错误,那就是像我们这里定义了两个字段。
15:09
每个后边都加了逗号,显然后边的这个逗号是不必要的啊,那我们需要把这些细小的CQ里边的语法错误要把它改过来,另外还有一点就是我们当前两张表,输入输出表都是连接到外部文件系统去进行的读取和写入,而这个时候对于文件系统的连接呢,就涉及到一个。它的文件格式,我们到底用什么样的格式去进行转换和解析,Format这里面我们定义的是CSV啊,所以对应的我们当前的环境里边,必须要有对于CCSV这种格式支持的相关依赖包才可以。啊,那当前flink内部本身是没有对应的支持的,我们还需要额外的引入啊,那这一部分呢,本身在文档里边我们是放到了第11.9节啊,后边连接到外部系统的时候,我们在这里会详细的去介绍连接到哪里的时候需要引入什么样的依赖,比如说我们连接到卡夫卡的时候,诶,那我们就想到了卡夫卡的数据读取出来之后,也应该要按照一定的格式去进行解析,或者说按照一定的格式序列化之后要写入,那比方说我们当前支持哪些具体的格式呢?Csv Jason a这些都是支持的啊,那所以如果说我们当前要。
16:37
定义。文件的格式,文本的格式是CSV的话,我们就需要引入flink对于CSV格式数据处理的支持啊,那我们引入这个依赖flink csv对应的版本就是flink的版本,我们可以把它添加到当前的POM文件里。
17:00
好添加进来之后,接下来我们就可以。把这个代码做一个运行,看看能不能真正的把这个数据提取出来,写入到我们相应的目录下面去。我们看现在已经执行完毕了,而而且现在我们发现真的多了一个output文件夹啊,这是我们的输出目录,打开看的话,诶,我们现在这里边有一个数据,我们看到这个这个是一个part,说明我们当前应该是要分布式的写入的啊,这里边Bob只有一条数据,所以说啊,那当然就是只有一个分区,只有一个这个分区文件了啊,我们如果想要测试的更加更加复杂一些的话,也可以把这个数据多一些,比如说我们可以让Bob的访问数据。多几条啊,我们当然了,也可以把这个Mary和Alice也多一些,这里可以增加。
18:01
四秒钟,五秒钟。每隔一秒输入一条数据。啊,那接下来,呃,比方说我们可以把这个对应的访访问的URL也可以做一些调整。我们让Bob的数据增加啊,那接下来如果能够拿到的对应的这个数据肯定就会更多了,我们可以先把奥先删掉。然后重新执行一下当前的代码。好,现在已经执行完毕,我们看到output的文件夹又出现了,现在分区就更多了啊,所以我们可以看到当前,诶,这里边分区鲍B有一条啊,然后后边呢,哎,蚌埠又有一条,对吧?那当前一共有几几个不同的分区呢?一共有六个不同的分区,为什么会有这么多呢?这其实是涉及到了我们当前运行代码运行度的问题。
19:05
因为我们发现当前如果说我们没有在上边定义这个stream execution environment的话,也没有在全局去设置并行度的话,那么当前的并行度默认应该是什么呢?应该是我们当前电脑。CPU的核心数啊,那所以我当前的电脑如果说有八核的话,那么当前整个这个运行起来可以看到的这个并行度就应该是八,那对应的数据当然是每来一个就可以放到一个分区,每来一个放到一个分区啊,那当前一共是有六条数据,所以还没有达到上限啊,那我当前的电脑其实是16核,所以我们的数据可能要一直达到16以上,才会出现同一个文件里边有两条数据的这种场景。这个可以就看的非常的明确。如果说我们想要去在用其他的一些并行度去进行测试的话,那还是可以把当前的stream execution environment定义出来,然后全局的去设置并行度,这是可行的,但是一般情况我们不会在代码里全局设置,如果想用不同的并行度去进行进行提交的话,我们直接在提交作业的时候加上杠P参数不就可以了吗?啊,所以这一部分我们也不需要去重复测试了啊,这就是关于我们。
20:26
表的查询、转换以及后续的写入这样的一个基本的测试。
我来说两句