00:00
接下来我们又可以再总结一下,把table转换成data stream的时候到底做了哪些操作,哎,之前其实我们给大家说过,就是在做这个想要看到啊,经过转换操作之后,想要看到它的转换结果,其实我们就已经调用了这个方法啊,就是本身弗link给我们提供的有一些影视转换的方法啊,可以把表直接转换成data stream,而且啊,就是这里边我们没有用,其实也可以转换成data set,对吧?呃,这关键是要看你到底是定义的是流处理还是批处理,要做怎么样的转换,呃,那么接下来我们得到的这个data stream或者是data set,你接下来就可以继续做流处理批处理继续的操作了,对吧?所以大家看其实用这种非常方便的表和流,或者说跟这个data set之间之间的这种转换就可以,怎么样呢?我可以把这个data stream API和table API结合起来用,对吧,我甚至可以一段代码里面整个。
01:00
这个处理程序里边,前边做data API的转换,在转换成某个data stream的时候呢,我再把它转成表,去调这个table API去做一些操作,哎,或者说我再注册到那个表环境里边写CQ,再再做一些转换操作,然后得到结果呢,我再把它转换成data stream,再去做其他的操作,所以说这就是结合起来应用灵活度就非常非常高了,那大家这些东西都已经学过,都已经掌握之后,在代码里边就可以觉得哪种方法更容易实现功能,容易实现需求,我们就调调哪种方式,对吧,因为他们转换其实都非常方便嘛,啊那这里边大家需要注意的是,当我们定义这个把表转换成data stream或者data set的时候呢,需要指定数据类型,大家还记得当时我们这个to派stream后边是跟了那个中括号,对吧,大家还记得吧?呃,当时我们是跟跟跟了那个中括号,直接把那个当前数据类型直接写在里面。
02:00
的一般情况我们给什么呢?啊,一般情况就是给一个元组类型对吧,把当前这个数据啊,呃,我们那个提取出来的字段,最后输出来的这个表里边的每一个字段作为一个元组保存这个输出当成这个流里边的数据元素输出就可以了,这是一种方式,那另外还有一种方式是什么呢?还有一种更简单的方式,我可以把它定义成肉类型,就flink里边也给我们提供了这个行啊,这个表里面的行肉这个类型,所以说直接指指定成一个data streamam肉也是可以的啊,这就是这样的一种转换的时候必须要指定数据类型,那大家会发现其实你转换成的这一个这个流啊,它最后它确实还是一个数据流,对不对,我们输出的时候是每来一条数据,经过表的这个转换操作处理啊,最后得到结果还是每来一条数据,就会引发我们后边的这个一次输出,当然有可能不止一条输出。
03:00
之前大家记得如果是retra的模式的话,那是一条数据会引发,有可能啊,会引发两条消息,两条输出,对吧?呃,一个是上一次的数据的这个,呃,删除,Re,撤回,另外一条是新数据的写入,所以说这里大家要注意,首先我们得到的这个表作为流失查询的结果啊,它是动态更新的啊,就是不停的,这张表里面的数据是不停更新的,有可能是追加,有可能是做了一个update操作,那另外呢,我们去把表转换成流的时候,就涉及到了两种不同的转换模式,就是我们前面看到的,在代码里边大家也看的非常的明显啊,我们在这一个之前,之前的那个file output里边,这里边有两种方式,一种叫做to a stream,哎,后边里边这个加上它的这个数据类型对吧,一个元组类型。
04:00
或者是to retract street,哎,这里边就看的非常的明确了,那那在这个处理的过程当中,大家可以看到啊,就是我也可以把这个定义成肉类型,对吧?我把这个定义出来,我我用的这里边要用的肉就得是flink types点肉对吧?Flink给我们定义的这个肉类型,这也是完全可以去输出的,那这两种APA stream和retra stream它俩的区别,那就又是我们之前讲到的类似于更新模式里边的区别了,对吧?呃,就是我们说的,呃,在有些情况下,我们当前得到转换操作得到的这张表,如果说所有的数据都是来一条追加到后边,来一条追加到后边的话,如果永远只是这样的一种模式给大家画出来。呃,就是之前我们说的啊,在在这个表里边所有的这个字段来了之后啊,每来一条数据直接追加到后边对吧,每来一条数据追加到后边,在这个过程当中其实就不涉及到太多的,呃,这个转换计算啊,对吧?呃,就是相当于只是一条一条记录列在这里而已,就像我们的那个日志往里边去去一条一条输出一样,是吧?啊所以这种表的话,那就直接是to upon stream,它转化成流就非常简单对不对,那就是一条一条数据输出不就完了吗?啊,这是最简单的方式,那另外还有一种方式呢,就是我们所说的啊,就是这里边这张表里面数据呢,它并不是直接在后面去追加的,而是什么呢?来一条数据写到了,诶,有可能没有的话,我写到了这里,对吧,再来一条数据的时候,有可能就直接更改之前的这条数据了,那在这种情况下,什么情况出现出现这样的情形呢?那就是如果。
05:49
说你做了聚合,大家想分组之后做聚合,那来一条数据,来一条数据,那其实是在之前那个分组统计结果上,应该是直接更改对吧?就我们讲的那个count值,直接第一个来了是一,第二个来了二,直接改这个数就完了,那这张表转换成硫怎么输出呢?诶大家就想到了,你不能直接把这条数据直接输出,那那你转换成流流,就不知道你当前这个到底是做了一个什么操作了,对吧?因为我们对针对表的这个操作是有要求的嘛,你是追加一个当前的这条数据,还是说更改之前的一条数据,对于我们最后输出的结果是不一样的,所以这里边我们要有一个说明,这个说明就是to retract stream,把它转换的过程当中呢,加上一个标记,大家还记得前面有一个布尔类型的值对吧,说false,如果是false的话,表示当前是一个,那就是要撤回的一个操作,呃,之前的旧数据,不要的那条数据,要更改的那条数据把它。
06:49
删了,那如果要是true的话,表示它是要新加入的数据,所以说对于这种场景,如果我们要更新一条数据的话,大家看到to retra stream之后,打印出来就是两条输出对吧?就是上一条数据的撤回false和下一条数据,就是真正更新之后的数据的写入,一个放呃,一个处的这样的一条数据啊,所以这就是对于这个当前表做转换成流的两种不同的模式啊,那自然我们就想到了更新模式里边还有一个UPS的模式,为什么这里边表就不能to upst streamam呢?诶,这就是大家想一想to stream,我们传转换的信息是什么呢?哎,就必须是不管是更新还是说直接插入,是不是传递的都是一种信息啊,那这种信息在对于这个外部系统而言,传输过去之后,它是靠什么去判断呢?啊,就是外部系统它自己里有。
07:49
一个K的判断对吧?当前表里边是有K的,然后我当前直接根据这个数据去去做处理就可以了,哎,所以这里面就有一个问题,我当前流里边你怎么能直接告诉外部系统,我当前到底是做一个插入还是做一个更新呢?它没有办法直接去告诉对吧?啊,所以接下来我们这里边就不能直接去做upset啊,所以大家看就是你能利用这是哪种更新模式去做这个转换,发送这个消息,那其实主要是取决于什么呢?就取决于接下来要做的操作,或者说你传输给的外部系统,它支持不支持这种处理,如果说外部系统接收不了这种消消息,或者说你你,呃,有同学说我这里边把它转换成一个这个up的stringam,不也很简单吗?你就发这个up消息不就完了吗?哎,发送阿消息是简单,但是你说我转换成一个流流,是不是接下来要做流市的处理啊,对吧,后面要做种样的。
08:49
这个呃,转换操作,或者说做这个直接就输出了,输出到外部直接打印输出了,那你打印出来我怎么能知道你这条消息到底是要插入还是要要要这个更改呢?那我相当于还得是人肉人眼去看,对吧,跟之前的那个信息,诶它是不是一样呢?一样的话我再去给大家做判断,那这就不符合我们当前做这个流处理的要求嘛,我真的是要更新,那你就真的给我更新呀,呃,就像我们说的流之前的数据已经输出到下游了,对吧,不管它是已经输出到外部系统,还是给下游任务去做执行处理了,他都已经流过去了,我们说的覆水难收嘛,水已经流走了,当然就你就不可能再把它拿回来做一次更改了,所以upset模式在我们流里边没有办法直接做转换啊,这就是这个含义啊,呃呃,那当然了,就是说这里边我们提到的这个追加模式啊,就是upon mode和。
09:49
这个撤回模式retra mode,他们俩之间有一个,大家会发现有一个共同特点,共同特点就是我把这个数据做这个转换之后,大家发现是不是相当于我后续的处理,不需要对之前已经输出的那个结果,直接把它再拿回来做任何的操作,对吧?你即使是这里的撤回模式,它的撤回是怎么表示的呢?是用一条新的一个撤回的记录来表示,诶,我告诉你啊,之前那条数据作废了,对吧,我要撤回了,他并不是真的把那个数据真的给拿回来了,就不像我们这个,呃,聊聊微信或者是发消息的时候,哎,你直接点一下撤回,真的一条消息就就就回收了,对吧,你就看不到了,流处理的时候,你说你已经输出到那个外部系统之候,你能真的再把它拿回来吗?那这不可能啊,对吧,不可能实现,像,呃,微信是给我们做了一个显示的一个转换而已嘛,事实上那个数据早已经发过来了,覆水难收,收不回来了。所以他。
10:49
的操作方式,其实还是在后面又追加了一条说明对吧,说诶,我要去撤回之前的那条数据,所以就就相当于大家在微信里边看到的,诶某某某撤回了一条消息对吧?撤回了哪一条消息,他他指是这样的一个操作啊,所以这里边其实大家也很好理解啊呃,所以你如果要转化成流的话,对吧,我们要直接打印输出的话,你就只能是reject mode,因为它是相当于还是在后面追加信息嘛,只不过追加了一个撤回的信息而已,而不是直接真的把那个之前的数据拿回来做更改了啊,这就是这个把table转换成硫的一个过程。
我来说两句