00:00
了解了把流转换成动态表的过程,而且接下来呢,我们考虑了基于动态表去进行持续查询的过程啊,那可以分为更新查询和追加查询,那最后一步呢,得到的结果动态表,我们就还需要再把它转换成为流,这才是一个完整的流处理的过程啊,那这里的关键点就在于。我们最终得到的这个结果动态表啊,到底它里边有没有更新操作呢?哎,那如果说之前我们做的是一个更新查询的话,那当前的这个动态表想要转换成流的时候,很显然就不能简简单单的啊,每来一个数据,我们就我们就把它当成一个更新之后的一个追加数据,输出成流转换过去就完事了,那如果他做了更新操作的话,那就应该输出一个更新日志里。所以如果说我们整个流处理的终点,最后得到的结果是要写入到外部数据啊,或者是转换成这个流在控制台打印输出的话,那这个时候呢,我们就要去做一个判断了,到底把得到的结果动态表怎么样编码成外界可以解读的一个更新日志,那所以这里就涉及到了不同的编码方式啊,那在flink当中呢,Table API和CQ支持三种编码方式。
01:22
第一种最简单,那就是颈椎加流的编码方式,APA only stream,那这种方式呢啊,那顾名思义啊,就是里边不涉及到更新操作,只有追加音色的这样的操作,那所以对于我们这个结果动态表里边所有的数据啊,都是音色的,都是加I。这样一个更新类型,所以接下来呢,我们就可以把它直接转换成一个所谓的颈椎加流only。这个流里边所输出的所有的数据,其实就是在这个动态表里边不停的新增的一行,新增的一行啊,在后面追加就可以,这个最简单啊,只有插入。
02:01
啊,那除了这种情况之外呢,另外当然就是涉及到更新操作的结果动态表了,这个时候我们就要考虑里边有更新操作,我们就要把它翻译成更新日志流,那更新日志流呢,最经典的一种编码方式就是我们所说的撤回流的模式。什么叫做撤回流呢?这就是我们所说的这个retra stream啊撤回流它是里边包含两类信息的流,什么叫包含两类信息呢?就是说我们网哎,当前来了一个数据啊,进行处理转换,经过这个持续查询之后,得到的这个最终的结果数据转换成一个流数据。想要输出到外部系统当中的时候,我们每来一个结果信息,就有可能输出到外部的是两类信息,什么信息呢?一类是添加A,就是表示我当前这个数据是要增加的一条数据。
03:00
另外还有一种消息呢,叫做撤回retract,什么叫做tract呢?啊,Retra的意思。其实就是要把之前已经输出过的一条数据收回来,撤回来啊,就相当于是要告诉我们我要去改哪条数据了啊,那改之前已经发出去的数据啊,那只要是外界能够解读出我这样的消息,他就可以针对我们之前发出去的数据做这个的回撤,做这样的更改吧,所以具体的编码规则呢,哎,这两类消息啊,我们看到就可以去编码各种各样的操作啊,针对我们这个表的操作当然就是增删给了啊,那如果是增加insert插入操作的话,编码就是一条at的消息。直接追加就可以了。那如果是删除呢,Delete呢,Delete操作的话,编码就是一条撤回消息,Tra就这条消息就是要删。哎,那如果是改呢,Update呢,Update就稍微麻烦一点,之前我们就会发现啊,每一次update更新操作其实对应的啊,我们结果数据其实是要发出两条数据的,它会编码成什么呢?编码成要更改的那一行,之前那条数据的retract撤回。
04:16
然后呢,更改之后的新数据的A再做一个增加,诶,所以这样的话,所有的这些操作,所有的这些改变啊,更新操作就都可以编码成不同的消息了。那这个时候我们得到的这个结果动态表也就可以编码转换成一个更新日志流了,啊,那对应的这样一个的编码方式呢,有时候也把这个更新日志流叫做撤回流,哎,所以之前我们不是说在代码当中,哎这个不光是调这个。Table envr表环境的to date stream方法和to changelo stream方法,另外还可以调totract stream和to pen stream,那to pen stream我们知道这就是一个颈椎加流了,那如果是to retra stream的话,就是把它编码成一个撤回流,哎,这是一种可以认为是更新日志流的一个具体的编码表现形式。
05:13
好,那接下来我们还是基于之前的这个例子啊,用图来说明一下我们当前这个编码的过程到底是什么样的啊,比如之前我们这样一句CQ,我们知道了啊,分组,按照用户分组去统计每个用户访问URL的次数,那其实这是一个更新查询啊,最终的结果是有更新操作的,所以来了一条数据爱丽丝,哎,那我们知道啊,输出的是爱丽丝一,这个只是插入就可以了,所以呢,后面我们看。这里的这个加号,前面这里的这个加号表示的就是一条at消息,表示的是一个insert啊,只是插入的一个操作,所以我们看到第一条数据来了之后,对应的就是增加追加了一条ALICE1。然后接下来Bob数据来了之后呢,哎,同样BOB1追加。
06:00
BO1,然后第二条爱丽丝数据到来了之后,我们知道是要把之前的爱丽丝一更新成爱丽丝二,这不是追加,这是一条更新,所以接下来我们发送的数据呢,一下就要发送两条数据。是之前ALICE1的撤回减,然后再新增一条数据,ALICE2的追加加,哎,这就是我们所说的这个撤回流啊,有可能一条数据的更新会对应着我们这里两条消息的发送。啊,那最后又来了一个carry的数据,那之前没有carry的信息嘛,所以当然是追加了加CARRY1,这就是所谓的撤回流的编码。啊,所以我们其实可以想到啊,假如说我们得到最终编码的这一条撤回流的时候,如果把它写入到外部的一个数据库,或者说诶,某种接收它的一个外部系统的时候,只要在这个系统里边,我可以按照当前的这个用户名啊,去做一个查找,做一个更新的话,那就很容易就能实现我们要做的操作了,哎,因为我这里边到底是加还是减说的很明确嘛,所以收到这样一个加AL1,哎,那我就直接把这条数据做一个插入,做一个追加就可以了啊那如果说啊,我们对应的一条数据的改变,直接来了两条信息,那很显然我要先把之前的那个爱丽丝一先删掉,然后呢,哎,再把这个爱丽丝二再追加进去。
07:27
这样的话,我们外部数据库就可以完整的操作我们当前结果的动态表。这是撤回流tra STEM这种方式呢,也是之前我们在这个控制台打印输出的时候啊,能够转换成一个data STEM的时候调用的这种方式,我们这里边输出的时候就是减U加U,哎,一条数据对应的就是两条消息,这是撤回流的一个表达,那除此之外呢,另外还有一种方式叫做。UPS stream,这就是所谓的更新插入里,哎,这个upsur啊,它其实是一个合成词,就是up和insert的一个合成。
08:06
什么意思呢?在这样一个流里边,它的编码信息也是只有两种,这两种信息一种叫做UPS,那我们看到这个up,它就是把这个更新和插入合二为一了。不像之前,哎,我们是只有这个插入操作,它的更新操作跟插入是一样的,然后另外还有一种消息呢,就是这个删除delete的消息。那我们会想到这里这个更新和插入怎么就能合二为一呢?那假如说啊,我们这里边来了一个爱ICE1这样的一条数据啊,假如说我我加了一个这个更新插入的这个标记啊,这个消息是更新插入,比方说我们用一个星号来表示吧,那我到底到底怎么样才能知道我对于这个外界系统啊,收到这个消息,到底是要插入一条这个数据,还是说想要更新这条数据呢?哎,其实这个很简单,就是外界就必须得知道我们当前的K是什么。
09:00
啊,其实这里如果我们要跟之前的retra stream啊,这个撤回流去对比的话,撤回流里边外界其实并不需要我们当知道当前的P到底是什么啊,因为我只要知道你到底是追加还是删除不就完了吗?只要能找到这条数据啊,只要是加我就直接在后面追加,如果是减的话,我就找到这条数据删掉就完事。所以其实并不需要知道它的K是什,而现在呢,就必须得指定一个唯一的K啊,因为我当前不管更新和插入,只要找到了唯一的K啊,当然就覆盖对应的那个值就完了。所以我们想到啊,在哈希map这样的数据结构里边,哎,或者是red里边的键值,对这样的k value啊,保存的这样的一个数据类型里边,我们如果要是指定了K的话,执行这个更新插入操作,那就是效果完全等同的。那所以接下来我们看看同样的这个例子啊,如果使用更新插入流把它编码成UPS信息的话,那会是一个什么样子呢?诶,我们看到啊,那就是第一条数据爱ice丝,这条数据来了之后,哎,那当然就是直接做一个爱丽丝一的更新插入,然后Bob第一条数据来了之后呢,同样BOB1更新。
10:13
然后爱丽丝第二条数据来了之后,我们要把爱丽丝的信息更新成二,哎,那我们说更新和插入是一样的,所以我们看啊,星号表示up,所以不管怎么样,我来了之后都是星号更新插入的这个操作,直接更新插入爱丽丝二就可以了。所以我们就说了,在外边的这一个数据库,或者说存储系统里边,假如说我们已经指定了当前的K就是这个user的话,那自然看到一个这样的up的消息,我就找对应的K嘛。找到Alice的数据,然后把它的值填进去不就完了吗?那后面是一就填一,后面是二就填二,所以这样的话就完全没有问题。所以我们也可以看到啊,更新插入流相比撤回流是有优势的,它的优势就在于诶,不会这么麻烦啊,之前这个撤回流一条数据的更改有可能会带来两条消息的发送,这个就比较麻烦,那现在我们这个更新插入流呢,一条数据对应的就是一条消息,这就比较简单,效率更高。但是啊,它也有问题,就是必要求我们的外部系统必须能够支持我们当前key的查询,而且呢,必须是唯一的K啊,这个必须是匹配的。
11:26
所以如果说我们并没有办法去捕捉到它的key的话,比如说我们这里做这个控制台转换啊,转换成这个data stream,转换成data STEM之后在控制台打印输出,那很显然在这里我们就没有办法指定key嘛。所以我们看。当前我们在做这个table env方法调用的时候,就不能去to upsur string,而只能to retract string,就是把这个要删除和要增加的这个信息都打印在这儿了啊,一目了然,你看要撤回哪个你自己看吧,但是如果说你让我根据这个key去做更改,不好意思,这里边没有办法做到。
12:02
这也就解释了我们之前为什么哈,这里的更新日流底层其实就是一个retra stream啊,它的编码方式就是撤回流的方式,所以在具体使用的过程当中呢。连接到外部系统的时候,到底是编码使用更新插入流还是撤回流也是。看具体情况而定的啊,那主要取决于外部系统本身的特性,如果它支持更新插入操作的话,诶,那我们当然是编码是更新插入流更加的方便,更加的高效,如果不支持的话,那就只能编码成为tract撤回流了。这就是关于最终我们将动态表转换成流的最后一步操作。
我来说两句