00:00
通过这样一段简单的测试程序,我们会发现不管是使用table API还是使用CQ,直接CQ都可以得到相同的结果,都可以得到一个table的对象,然后接下来呢,可以把这个table对象的内容直接插入到我们的用于输出的连接器表当中,这样就可以写入到外部系统了。啊,这个过程我们可以直接使用KAPI也可以单独的执行CQ也可以两者混用,这个都是一样的啊,那当然了,我们这里的输出是直接写入到了文件,那我们自然想到了平常做测试的话,那有可能还需要去把它入到其他的一些系统,那这里边要更改什么呢?那非常简单,还是我们输出用于输出的数据是什么样的话,这里边前面定义这个表的结构就是什么样的。所要更改的只是后边connector换成别的就可以了,如果是卡夫卡的话,这里就是卡夫卡啊,如果是呃,Red的话,这里就是,如果是MYQ的话,这里就是GDBC,后面再更改一些对应的配置项就可以了。
01:10
那如果说我们想要就是简单的在控制台做一个打印输出,能不能做到呢?当然也是可以的。其实前面我们最简单的这个简单上手这个事例里边已经做了这样的测试,不过呢,当时我们是把它转换成了流to streamam,然后去做了打印,那我们知道to stream这个方法,它其实是stream table environment下面的一个调用的方法。它本质上是我们当前确定了就是一个流处理,所以我当然能把得到的一个表转换成一个data,转换成一个流了。那现在呢,我们的这个注册其实不太一样了,我们现在的表环境。本身就是table environment,因为我们是通过配置来指定当前到底是处在什么模式下,那当前table environment它并不区分。
02:05
本质上啊,我们并没有直接去区分开到底是批处理还是流处理啊,所以这里边它是批流统一的,我们如果要直接在这里想要去把它进行转换的话,Table的话,我们发现没有对应的这样的方法要用,那接下来应该怎么样去直接在控制台做打输出呢?当然我们也可以还是把它,呃,当前的那个stream environment定义出来啊,然后基于它去定义一个environment,这样的话我们就可以直接简单的去做。转换成流的这个操作了,把它打印出来了,那现在如果说我们是纯CQ这种方式上来之后,跟那个流那边没有关系,那现在也可以就是同样使用。注册一个连接器表的方式,然后注册一个用于输出到控制台打印的一张表。所以接下来我们。创建一张。
03:02
用于。控制台打印输出的表。那这个表的话,我们可以直接借借鉴之前的这种这种输出形式,我们可以把它叫做。Print out dl这个我们也加了一个。Print out。跟上面的这个注册的table做一个区分,然后我们接下来呢,就可以直接使用,呃,我们我们就可以直接用这个来进行做一个输出了啊,那当然了,这里我们看到同样还是前面是URL,后边是username,呃,这个好像有点区别,就是之前我们这个。O table本身它应该第一个是username,第二个是URL,当然这里我们本身叫什么名称是不重要的,所以我们还是可以正常输出,之前输出的这个文件里面我们看到前面是username,后边是URL,这主要是跟我们本身前面的这个转换逻辑有关的,这里边我们提取出来的第一个是username,那当然应该就是username了啊,这里本身这个字段叫什么名并不重要。
04:11
当然一般情况我们还是希望跟具体的这个名称要对应起来,所以还是把它做一个更改啊,那现在的话这个刚好就不用更改了,我们直接把第一个叫URL,后面这个叫username就可以了。那接下来我们。把这个连接器更改一下,不需要任何的配置参数,只要指定connector是print就可以了。好,那接下来我们的这个Q把当前的create print d做一个执行,然后创建出这样一表,接下来当然输出的时候就是result table,二去execute insert操作。然后我们输出的是输出到print out这张表。跟之前我们输出到文件系统非常的类似,我们定义了一个输出到控制台打印输出的连接器,接下来我们运行一下。
05:13
我们可以看到对应的啊,当然了,因为我们前面这里执行的时候还有。输出到table啊,就是我们对于table API直接得到的这一个table的一个写入,那所以这里边的文件就又多了六个,这个是我们能够看到的啊,多出来的文件同样还是前面是userna,后边是URL,然后接下来。我们发现。同时在这里还有六条数据,在控制台做了一个打印输出,而这个打印输出的顺序呢,是前面是URL,后边是username,这个输出的是我们看到的这里。通过CQ经过执行转换之后得到的result table2里边的数据,我们是把它输出通过这个print out table这样一个连接器表进行控制台打印的输出的。
06:09
这是关于控制台打印的这种方式,那另外我们想到了前面这里边我们用到的这些方式啊,可以用table API,也可用CQ,也可以把合起来进行换,那实际项目当中一般怎么用呢?当然了,就是用哪种方式都可以,可以非常的可以混起来用,一般的话当然是直接用CQ的比较多啊,因为我们看到table API这种调用的方式呢,既不太喜欢,既不太符合我们在Java当中的习惯,其实也不太符合我们CQ当中的习惯,所以它有点像一个杂柔的结果,那一般情况呢,我们就是直接写CQ的场景会更多一点,而且在真正意义上在这个。呃,当前的table API里边啊,CQ本身支持的功能也要比table API,就是本身这里边table能调用的方法要更多,所以我们一般就是不用API,只用CQ就可以了。
07:08
那除了前面讲的这个简单的转换,我们接下来再来介绍一个比较复杂一点的转换。我们可以把这一部分先。助助调啊,那接下来我们说一下,还是前面的这个查询转换。我们来介绍一个复杂一点的,那当然就是前面这里只是做一些字段的提取,那我们现在至少要做一个聚合计算吧,啊,所以接下来我们看一下。执行。聚合计算的。查询、转换。所以我们这里会想到啊,如果说现在要基于当前的,呃,我们就直接基当前已经注册好的这个click table吧,原始的这张表,然后做一个查询转换,我们就非常简单的想要针对每一个user,每一个用户统计一下当前它到底点击了多少次,有多少次访问,相当于是统计一个呃,每一个用户的PV,那接下来我们就会看到。
08:17
要执行的这个CQ,那就是select。这个CQ其实非常简单,那就是username,每一个用户,然后还有一个count,我们直接去执行这个count。函数啊,那这里面我们可以直接抗芯,当然也可以抗URL啊,计算哪个都是可以的,然后接下来啊,我们可以给它一个重命名啊,因为得到这个字段,我们有可能还要把它做提取,做转换,给它一个重命名。From当前的click table。那另外还需要有group。有点累。这就是一个最为简单的聚合计算,统计当前每一个用户。PV次数的一个CK啊,那这个结果啊,我们就叫做AJ或者叫agg result。
09:07
得到了这样的一个结果,接下来如果说我们想要把它直接在这个控制台做打印输出的话,哎,那接下来我们就想到了,是不是这里就需要当前应该改一下这里的字段名称呢?因为我们当前得到结果username,诶,这是string,这个没问题。但是后边的这个count很明显,呃,调用了count函数之后,得到的应该是一个长整性的值,在C里面的话,那就应该一个in啊,那所以接下来我们其实是要增加一个字段。CT。这样的话。我们得到的对应的这个类型啊,跟输出表的这个类型就完全一样,所以接下来我们可以直接把a table。去做一个execute insert啊,那当前我们直接输出的是输出到print out里面来。
10:09
接下来我们再来看一下这个结果又是什么样的。我们可以看到当前的输出结果会有一点不同,跟我们之前的执行的那个结果有一点不同,因为之前的这个过程呢,我们会看到是,诶就是有这个都是前面都是加I,我们说I是insert。Insert的首字母是表示插入。所以我们看现在它一开始也有insert,但是后边呢,后边又有U,而且有减U和加U,这是什么意思呢。那对比insert,我们自然就想到了U,当然就是。Update了。相当于是一个更新操作,那所谓的减优当然就是。把这条数据要删掉,这条数据要去更新,所以是把它先删掉,然后加U,当然就是这条数据更新的话就是更新后的值,所以这里是通过减优和加U来表示我们更新哪一条数据,然后把它更新成什么样的,所以简优后面跟的是更新前的那条数据。
11:23
加U是当前的一条数据更新之后的样子。啊,那有了这样一个基本的判断,接下来我们再看的话,就会发现这就很好理解了,来了一条Mary的点击数据,那当然MARY1,我们现在是加I,哎,那就是相当于插入了一个MARY1。然后接下来鲍B来了一条数据,哎,那我们看这个是先来了爱ice丝,因为我们现在是并行执行的嘛,啊,并行度我们看这个就最大到了12啊,因为我当前是16核,所以这个当前的并行子任务的,呃,当前的那个序号索引是最最大可以到16个啊,那当前我们看到Bob和Alice丝都来了一条数据,所以鲍B1爱ice丝一这个是没有问题的。
12:06
然后接下来我们会看到Bob又来了,第二条数据,那会先减去BOB1这条数据,然后得到一个BOB2,为什么会这样呢?因为我们知道现在我们其实得到的这张表里边就是。一个user。一个count。CT。所以我们这里边如果是一个爱丽丝一,一个BOB1的话。接下来Bob变成二,我们并不会在下边去追加一个BOB2。而是直接把BOB1这条数据修改成二。所以这个我们就发现了,当前的这个操作,它并不是在这个表里边往后面直接追加,所以它不再是insert了。我们初始的那个状态的话,那当然都是insert,接下来现在就有了更新操作,Update操作,所以是删除掉之前的报一减U,然后增加BOB2加U,把它改成BOB2啊,那同样接下来如果又来了Mary的第二条数据的话,我们就是减去MARY1,然后加上ME2。
13:18
那别的剩下的当然就都是鲍B的数据了,我们知道鲍B1共有六条数据,所以我们会看到依次去更新,而且它的更新都是在同一个分区上去做的,为什么会在同一个分区上去做呢?因为我们知道这里定义的过程当中,我们处理的过程当中有group by。这个聚合,这不就是相当于我们K之后的分组聚合吗?诶,所以既然有类似于K,当然最后它就所有的数据都会被分发到同一个分区上去,所以接下来我们都是在同一个分区去做的,那先是减去BOB2更新成BOB3,然后又减去BOB3更新成BOB4,最终更新到Bob丁。这就是我们最后得到的结果。
14:01
啊,所以我们会发现在处理这样的数据的过程当中,可能会有两种情况,一种是所有的数据来了之后,就直接追加到后边,比如说像我们前面如果说没有做这个聚合计算的话。那所有的数据只是做一个提取,我们知道这就类似于一个map或者flat map这样的一个处理过程啊,跟之前的数据没有关系,来一条就会输出一条,那当然是追加了,当然是加爱,而后面如果我们要做分组聚合的话,聚合的结果就会有更新操作,那么这个更新操作明显它的输出就会不一样。我们这里对于更新操作的输出表示是用减去一条之前的数据,然后加上一个当前更新后的新数据来表示。那关于这一部分呢,我们会在后边讲到动态表,持续查询以及更新模式的时候再去做详细的讲解。
我来说两句