00:00
我们到目前为止已经了解了,怎么样在弗link里边去啊,从外部的这个文件系统,或者说卡夫卡这样的消息队列啊,去读取数据,然后呢,读进来之后转换成注册成表,然后进行table过API,或者说直接写写CQL对它进行转换查询啊,那这部分我们都已经了解了,那对于我们整个流处里的程序来讲,就差最后一步,那就是think了,那就是要把一张表里面的内容输出到外部系统里面去啊,那对于这个表的输出呢,它其实是在底层有一个结构叫做tableable think,所以说它其实是把我们的这个数据写入到table think里边,而这个table think呢,关联到了外部的这个啊存储系统,这样的话就可以直接把我们的这个think任务构建出来,跟外部系统做连接,把数据进行输出写入了,所以table think呢,它其实是一个通用的接口,可以支持。
01:00
不同的文件格式啊,存储数据库,或者说卡夫卡这样的消息对列这些都是可以的,那输出一张表怎么样可以最简单的输出一张表呢?这就是我们前面提到的直接insert into,首先我们需要在外边建立跟外部系统的一个连接,对吧?啊,就是连接到外部系统,然后在环境里面注册一张表,叫table table,然后接下来呢,我们把之前做过转换操作的得到的一张表,想要写入的一张表拿出来,这个table,这这是一个table的实例,对吧?啊,Table API里面的一个实例啊,用这个table去调用它的insert into方法,然后里边的参数就是我们的这张表名,那底层呢,就会相当于把对应的这张我们这个table这张表里面的数据写入到table think,然后table think呢,又连接到了外部系统,这样的话就实现了一个输出啊,所以整体来讲,它跟我们的那。
02:00
这个数据读入啊,就table sourcece那边其实是基本上是一致的,就因为table sourcece那边我们相当于是定在环境里边注册一张表连接到外部系统对吧?然后接下来是如果说我后面这个table env直接from这张表的话,那就相当于是直接从外部系统通过table sce去读数,对吧?有对应的那个连接器直接去调用它的连接器方法去读数,那如果说这里边我并不是直接从里边去from去去转换成表,而是把一张表去做insert into,那就相当于是调用了底层的S任务去做输出,这是关于这个输出表的一个整体的概念,然后接下来呢,我们就来给大家具体的测一测怎么样输出到不同的外部系统当中,那首先最简单的就是一个输出到文件,我们就直接在代码里边来给大家做这个讲解吧。呃,接下来我们还是在代码里边新建一个,新建一个object,然后呃,当前我们就是主要是测这个输出到文件对吧?所以我们叫file output test,然后对于一开始我们的这些操作和定义,那大家会想到这个环还是一样对吧?我们直接把这个环境创建出来,这个我就直接抄了啊,然后为了后边我们做这个影视转换的方便,直接把这两个流式的执行环境直接下划线引入啊,然后接下来第二步就是读取数据了啊,这个读取数据也简单,我就干脆来做一个文件读取读入,然后文件读出对吧?那这个其实整体来讲这个处理的流程应该是最顺的一个流程啊,大家可能会觉得最最简单的一种方式,那所以接下来我们就直接把这个。
04:00
是之前的这个读取啊,把这个。该引入的所有的这些全部引入,那这里的这个就是我们读取读取文件啊,注册成表,注册成一张的表,然后接下来呢,我们再做一些转换操作,转换操作,这个转换操作这里面我们稍微给大家,呃,就是做的这个事情稍微多一点,首先我们想到可以对它,呃,大家知道,就是比方说我们这里边定义一个table对吧,基于当前的这个table,那首先我得把这个定义出来,比方说我还是叫34TABLE啊,那就是table enna,还记得这个方法吧,直接from,然后把当前的这个input的table写进来,然后接下来做做转换的时候,哎,我们还是基于它这个,其实直接直接抄就行啊,我直接就还是用之。
05:06
填的那个方式吧,这个代码都差不多啊,就主要是看一下它这个能不能正常输出,这是一个ID,然后呃,大大家看我是定义了这个select,这个temp这个字段,对吧?你如果要是定义这个temp这个字段的话,那是不是相当于我这里边读进来的时候就是这个field就得叫temp呀,大家想想是不是这样对吧?呃,就是我直接从当前的这个文件里边按照CSV格式把它格式化,然后得到这个呢,我直接就叫做temp,那这叫temp的话,后边我就这个字段名也是temp select直接select出来啊,那比方说后边这个filter给还是啊,ID等于三一啊,这个还是完全一样的一个处理流程,呃,然后接下来我们就是真正的去做一个,呃,当前的这个就是输出的一个操作了啊,另外我们在之前再给大家来做一个其他的一个转换,大家可能想到哎,有这。
06:06
个简单转换对吧?这里边首先这是一个简单转换啊,简单转换,那有没有稍微复杂一点的转换呢?这个感觉还是有点太小儿科了是吧?哎,我们这里边给大家做一个聚合转换,因为之前我们讲那个transform这个算子的时候,我们就曾经讲到过有这个简单转换算子,然后后边哎,我们讲到还有这个聚合算子对吧?滚动聚合算子,那我们一般情况是要先做这个K败分组之后,然后才能够做这个聚合的,那现在假如说我想要去针对当前这个代码去做一个聚合,那应该怎么去做操作呢?呃,这个聚合操作其实整体来讲也是比较简单的,就是比方说我定义一个a j j table对吧?聚合结果,那我基于之前的这个3SENS去做一个聚合的啊调用啊,那大家可能会想就是哎,如果这个。
07:06
帮你直接去做聚合的话,那是不是呃,直接叫aggregate呢?哎,不是这样的啊,大家会看到我这里边是能调这个aggregate,但这个aggregate呢,它是一个一般化的一个方法,它是基于这里边你看它是基于data set去去调的这样一个东西,对吧?啊,它是把当前的这个sensit table当成data set去做处理了,那这里边我们如果要是调这个table API里边的方法的话,那应该怎么做呢?我要先做一个分组,就像我们之前KBY一样,还记得吧,先group by对吧,现在就是真的大家熟悉的这个group by出来了,不是KBY了啊,又变成group by了,那这里边要传的呢,还是要不是类型,要不就是一个表达式,那这里面当然我们就是基于ID了,传一个表达式,对吧?呃,ID传进去,这里面我写一下啊,基于ID分组,然后接下来啊,下一步操作那。
08:06
哎,大家看现在你如果再去调这个agate的话,这就可以调这个table,大家看这个叫做group的table对吧?啊,就是做了这个当前我们做了做了这个group by之后,得到的数据类型变成了一个group table,有点像我们的那个kid state一样是吧?呃,K stream一样啊,那这里边得到的是一是一个group table,然后group的table呢,可以调这个聚合操作aggregate,最后得到一个aggre aggreted table,对吧,就得到一个聚合之后的table,哎,那这个聚合之后的table又是个什么东西呢?最后还要调一个select的方法,把它就是提取出来,对吧,因为你不能光定义聚合呀,我最后得到这张这张表,你肯定要有一个s select选择把我们想要的那些字段提取出来的,所以最终还是要一个select,那这里边还可以怎么样呢?其实大家发现就是在这个group的table里边,它本身就有select方法对吧,本来就可以直接去提取。
09:06
啊,大家看有这个aggregate,还有flat aggregate对吧,就是打散了之后的那种,呃,这种这种呃聚合的方式啊,啊,那还有另外一种就是这个select,那大家可能会想你分组之后就是要聚合的呀,你直接select,这叫这叫什么意思呢?啊,其实大家知道select里边我们传可以传一个表达式对吧?那传一个表达式你看一看它的这个举的例子,这个写法表达式里边我是不是就可以比方说average这个怎么样传这个表达式呢?我这里边直接可以取一个字段的平均值去做统计,直接点average这样是不是可以啊,那这完全没毛病对吧?哎,或者说大家就想到了,我可以直接点some对吧?呃,点点max.me这些简单的聚合都可以直接在这个select里边去,去把这个作为一个表达式的,呃,表达式的一部分啊,调这个方法把它传进去啊,所以在这个里边其实就是我们最简单的。
10:06
方式其实就是直接select,那比方说现在我想要什么呢?啊,就像抗我我们那个word count一样,我就要当前每一个传感器的温度,温度值出现了几次,哎,那是不是就相当于我把这个ID啊出现的次数去做一个count就完事了呀,对吧?哎,所以这里边我可以ID点啊,那大家看这个这里边的这个函数就多多了,我可以直接count对吧,不像我们在data stream API里边,嗯,根本就没有count嘛,我还map成这个二元组,然后再去做转换,这里边直接count,因为CQ里面本来就有count函数嘛,所以当然这里边都可以用啊,那count我还可以给它再命个名,对吧?啊,就是你最后得到这个字段到底叫什么名,叫count啊,这是完全可以的,那后边大家要注意啊,这个聚合转换如果得到的就是这个表的话,有同学可能想到了,就是我先给大家做一个测试啊。这个本来应。
11:06
应该是在前面我们做那个转换操作的时候就做这个测试的啊,我们补在这里边再测一下,大家看一下前面我们知道result table,如果要是想要直接输出的话,我可以怎么样to string就可以输出了,对吧?诶,这里边得到这个类型二元组,一个string,一个string,那个time,呃,Time是一个double对吧?然后去print把它打印,这是result,那如果说这里面的a j table,如果要去to a pen stream可以不可以呢?哎,大家可能想到了,这个不就是这样的一个操作吗?然后你那个count之后,那要看它那个到底就是这个我们当前这个方法返回什么什么东西了,对吧?啊,这个其实它底层最后返回的是一个长整性,所以说就就像我们的那个CQ里边调用那个抗函数一样,这里边我们返回一个长整性,所以这两个值放在这儿,最后我把它做一个打印输出print。
12:06
呃,这个是agg对吧?啊,大家可以先看一下这个效果怎么样啊,我们先做一个测试,运行一下,大家看到这里面报错了对吧?啊,运行报错了,我们看一下这个错说什么?它说的是table is not a a pen on table use to retract stream in order to handle ADD and retract messages,这是什么意思呢?哎,这就是表示我们前面调用了这个toend stream,它表示的是什么含义呢?就是把它转换成了一个追加流对吧?啊,添加流就只在后面追加元素的这样的一个流,诶之前我们这个result table呢,它可以直接做这样的一个操作,这样我们就直接把它输出了,但是这里你看到哪一行报错,50行报错,对吧?A j table我们做过聚合之后,它是不能直接在后面做追加流的,大家想想为什么,因为你这里边这张表。
13:06
你想我这里统计这张表的话,得的数据,得到的这个我们当前的这个数据类型啊。一个ID一个count,这张表里边,那我这张表里边存着的是不是就是诶假如说哎,我当前341对吧,ID是一,这个count是十,然后342 ID是二,哎这个count是13,那假如说我接下来又来了一个341的数据的话,你说我是直接在这张表后面追加一条11吗?不应该这么做,对吧?作为我们这样一个表来讲,最后你统计这个ID和它的count,显然不应该在后边直接追加,而应该怎么样呢?是不是应该把这个直接十改成11就完事了呀,对吧?那所以这里边我们聚合得到的结果,这张表跟我们前边你这里边,诶前边你看到这个为什么叫颈椎加颈椎加的这个表呢?就是因为你这里面要的就是一个ID一个temp吗?你要的不就是来一个数据输出一个一个当前的它对应的那个温度值吗?你并不去更新对吧,所。
14:16
所以这里边就是来一个一,哎,当前是这个十度,那就放在这儿,又来一个一,当前是15度,我就把它放在这儿,它只做select和filter,并不做别的一些调整,那当然就是我来什么数据就直接往后居加就完了,但是你如果要做这样的聚合转换的话,当前的这个操作就不一样了,所以这里边大家看到我必须要去做一个,哎,这里边说要调用不同的方法去吧,做一个retract stream去做一个这样这样的一个转换,呃,这个就稍微的麻烦一点,对吧?啊,然后这里边stream呢,在呃,我们这个实现的过程当中,你会发现啊,就是它最后返回的这个数据类型有点不一样,大家看到它最后返回的数据类型前面带了一个布尔类型的值,然后后边才是我们的这个定义,好的二元组的这个值,这代表什么含义呢?哎,我们来看一眼大家就知道了啊,我们先输出一下看眼这个值,好我们看一下,哎,这里面。
15:16
输出结果了,大家看一下当前的这个状态啊,Result这里边啊,不出意外对吧,把那个341筛出来嘛,所有的筛出来挨个输出,这个没什么好说的,我们关键看这个aggregate啊,AG它这里面输出的是什么呢?诶首先每来一条数据,我们这里边大家看前面他给了一个触放在这了,对吧?哎,先放在这儿,然后接下来你看我们一开始一六七十,这都是直接输入,接下来再来一个一的时候,大家看他做了一个什么操作呢?3411,这个它给了一个false,然后3412给了一个处,这代表什么含义呢?这就是我们说的,哎,就是你做这张表的更新对吧?接现在我们要的不是每一个ID对应一个count值吗?所以它现在这个含义是什么呢?我在这个控制台打印输出这张这这个流对吧,这个流我没有办法直接改之前已经输出的那个。
16:16
了,这是没办法改了对吧,三一是一这个没办法改了,那我可以怎么样做一个补救操作呢?我可以加一个字段,加一个false,表示你当前的这条数据作废了,对吧?然后接下来怎么样呢?一二添加一条true的数据,表示这是我更新之后的最新数据,大家看它可以做这样的操作,对吧?因为你如果要是没有这个字段的话,那我们这个表里边你1112来了这两条数据,那你说我当前的这个ID1,它的这个count到底是几呢?我就搞不明白了,对吧?哎,所以这里边我必须要有一个字段再去表示它到底哪个是老的已经不用的删除的数据,哪一个是现在新的需要去真正使用的数据,这是在这儿去做这个操作,那后边大家看到这个就很好理解了,那接下来是不是每来一个341,它就会把之前的那个结果废弃掉,然后341成。
17:16
三对吧,然后再来一个分析掉,变成四,再来一个最终变成五,一共我们有341输入了五条数据,所以得到的结果就是这样的一个数出啊,这是关于这个直接啊,我们在这个呃流里边去做输出的一个状态,那现在呢,我们想把它直接输出到这个文件里边去了,那接下来我们怎么样能够在文件里边做一个输出呢?这里边其实就还是一样,我得先定义一个到文件的,大家想得定义一个到到文件的这样一个连接器,对不对啊,得得连接到文件,然后才能做这个输出嘛,所以接下来我们第四步输出到文件,呃,那首先我先定义一个,呃,其实这个不用只定义,就是我可以在就是当前这个环境里边直接去注册输出表,对吧?啊,首先我们就是注册。
18:16
输出表输出表啊,那这里边table env直接点connect,这里边同样,哎,这这个大家就想到了,是不是还是去new一个file system对吧,就跟我们前面这个过程其实是一模一样,只不过呢,就这个pass换了而已,对吧,然后这里边我要定义的这个就是skima这个表的结构可能也要变啊,所以干脆我把这个直接过来吧,甚至我直接连上面这个我都过来对吧。直接放在这,然后接下来这个啊,我把这个叫做,那这里边我们改一个名,之前这个叫三,我现在叫做output,好,然后接下来后面这里边把这个UT要传进来,连接到这样一个文件里面去,那接下来我们要输出的时候,比方说我要输出这个result table里边的值,那大家想这里边应该是一个ID一个temperature对吧?那所以这里边呢,我就定义中间这个就没了,我直接定义一个ID,一个一个tempb,这不就完事了吗?啊,当然这个名字你可以不叫temp对吧,你叫temperature这个也是没没毛病的,那这里边比方说我把这个注册啊,你不能跟之前已经有的那个表重名,这里边我们要叫另外一个名,比方说叫output啊,那接下来我可以把这个就注掉了啊啊,接下来怎么样输出呢,大家知道,就是直接用当。
19:53
前的这个result table调它的insert into方法里边,把这个output table名字放在这,这样就think到外部系统,好我们直接执行一下啊,因为大家知道print其实也是一种特殊的think吧,啊,这里我们把这个普通的这种print测试住掉,然后接下来我们做一个运行,好大家看这边已经正常的运行结束了啊,那现在这里面没有任何输出,我们当然是得到当前的这个,诶大家看到多了一个output text对吧?诶这里边你看到我们就把341所有的数据提取出来,写入到当前的这个文件里面来了,这就是输出到文件的最简单的一种方式。
20:39
好,那有同学说啊,那这个太简单了,那你要这么说的话,接下来我就把这一个,呃,就是我们刚才不是还有一个这个a j j table吗?我也给他做一个输出,哎,我把这个a j table换一下啊,那这里边就是a j j table要去呃insert到这个auto table对吧,那你如果要是这个a j table的话,大家注意啊,还得把这个是不是要做一个做一个这个数据类型的转换啊,你这里边本来是这个temperature,是一个double,你现在的这个这个I的这个这个就不一样了,对吧?啊,你这里边我们的这个那应该是一个啊,这这里边我还是这个啊。
21:19
要不然大家之后就不知道本来是什么样子了啊,我把这个改过来,那这里边有些同学可能说,哎,我这里边直接叫count,最好不要叫count,为什么呢?在表环境里边,你这里边本身这个count是什么呢?这是当前table里边的一个字段,这个无所谓对吧?这是在我们这个scale代码里边,它这个,呃,实例对象里边的一个字段而已,这个叫count无所谓,但是当前你这个count呢,这是在我们的这个表环境里边,它是这个基于CQ的一个,呃,就是k set的一个,呃,CQL执行环境对吧?你这里边如果用count的话,大家知道CQL里面是有count这个函数的,你这里边叫count就有可能会有问题,所以我们另外定义一个名,比方说叫这个CT对吧,然后它的类型应该是big int,然后我们看看当前能不能正常的把它这个写购进去,大家这里边看到运行报错了,为什么运行会报错呢?它报在哪一步呢?又是在去。
22:20
Inser into这个a table的时候,对吧,又是这个a j table出了问题,大家看这里边有一个什么报了什么呢?A pen stream table think requires,这个table呢,必须只能是insert ts,这是啥意思啊?哎,说白了大家看,就是说我们当前的这个table think,就是我们要往文件里边写入的时候,它底层我们说它不是一个table think吗?当前是个什么table think呢?它是个aend streamam Apple table think,也就是说它当前往文件里边写入,它就是一个颈追加的流的一个table think,所以它必须要求什么样呢?你只能有插入的这个变化,不能有像我们当前AJ这样,哎,就是更新里边的某一某一行数据,对吧,不能有这样的一个变化,哎,所以你会发现这里边如果你做了这种更新操作的话,它是不能直接插入进去的,这个就比较麻烦了,对吧。
23:20
啊,那对于底层而言,我们会给大家就是稍微的看一下当前的这个就是啊,呃,就是本身的这个table think,大家看就是底层调用的话,就是这样的一个接口table think,那这里边如果说呃,CSV,对吧,大家看我们现在的这个CSV文件,想要去做这样的一个写入,它最终调用的是什么呢?是一个csv table thinkc,这是本身实现了一个,哎,大家看就是它实现的接口是什么呢?就实现我们这个table think啊是batch table think p处理的table think,或者流处理是什么呢?流处理是a pen stream table think,大家看就是这个东西对吧?啊,所以它并没有实现能够更改流处理里边数据结构的这样的一个写入,大家想想为什么没有这样的一个呃一个写入呢?因为你现在是要文件里边去写呀。
24:20
你这个数据已经写进来之后,接下来你说我要把这个文件里边的这个数数值再去改一下,把它这一行更改掉,这显然做不到,对不对啊,这对于文件系统而言是做不到这个的,我只能在后面追加一行数据,呃,多写一行数据,这个是可以做到的,诶,所以这就是这里边实现的时候,它只实现了a pen stream table s,所以如果你做了聚合操作,做了这样类似于更改里边一行的操作的话,那就不能写入到文件系统了,对吧,这个是有有限制,那最终我们就只能是用这个result stream,呃,Table只做了这个简单转换的操作,可以写入到这个一个文件系统当中,因为它是一个aend stream table s,对吧?啊,就是紧追加在后边的这样的一个处理。
我来说两句