00:00
那接下来我们来给大家梳理总结一下flink跟卡夫卡连接起来的这个端到端的系统,怎么样去保证状态一致性,之前我们说过卡夫卡本身消息队列嘛,跟弗link本身天生是一对啊,他们连接起来就是处理这个流失数据的一个最佳的一个组合,诶那大家想,如果说我现在的这个系统架构是直接fli从卡夫卡那边去读取数据,消费数据,然后呢,通过flink内部的计换转,转换之后最后写入又是sink到卡夫卡里面去,那大家想这就是我之前我们讲的这个数据管道,对吧?卡夫卡构建的一个数据管道,就是数据流一个一个来进来,做了计算之后呢,再一个一个流出去啊,这就是一个完整的流式处理,那大家想一下,这样的一个端到端的处理系统,怎么保证它的状态一致性,要达到最好的,当然就是精确一次,一个在6万次了,怎么做到呢?啊,那他想就要划分不同的不同的位置,不同的组件嘛,首先在flink的内部,这个很简单,Checkpoint直接搞定啊,我们说只要有柴point就能保证它的状态一定是exactly万死的。然后接下来就是S端,S端大家知道我们用到的跟卡夫卡连接的话,那是一个flink卡夫卡consumer,一个这样的一个消费者,那么这个消费者大家自然知道,他肯定可以重新提交编便移量,对吧,里边就有那样对应的一些配置嘛,即使我们不做配置,那它默认的行为是什么呢?
01:34
诶,这个大家注意,我们其实当时不需要做更多的配置啊,它默认行为就是会把当前的偏移量,提交的偏移量作为状态保存下来,它直接就会做这件事儿,我们不用去做设置,那么如果说后续出现了这个故障的话,那大家想我是不是就从上一次成功保存的检查点里边把偏移量读出来,然后由这个consumer自动去重新提交偏移量啊,然后就可以重新读取了,对吧?啊,所以这一块只要我们用到这个卡夫卡的这个consumer啊,连接器里边的这个消费者,那么直接就可以重放数据,保证这个数据不丢了。
02:11
然后接下来就是think,最关键的就是think think的话大家知道我们用到的是一个flink卡夫卡producer,一个生产者,那么这个生产者源码里边我们也看到它其实实现了一个。To face commit s function,这就是我们所说的两阶段提交对吧?啊,所以这个弗Li跟卡夫卡他们连接起来,真正最后是得到了一个端到端的状态一致性保证的,那这个是它是靠什么保证的呢?那那就是首先内部靠checkpoint,然后呢,呃,这个消费者这一端啊,数据源这一端靠的是重新提交偏移量保存偏移量重新提交,那么在最后think这一端,那就是靠的是一个两阶段提交了,那接下来我们既然已经知道它原理是怎么去做的了,那接下来我们就看一看整个的这个流程啊,再梳理一遍,到底是怎么就是保证端到端状态一致的。
03:10
呃,那首先大家看一下当前我们的这个系统架构里边有哪些组件,我们先瞄一眼啊。呃,首先下边这就是大家非常熟悉的卡夫卡进,中间是弗link内部处理的这些任务,首先是S任务啊,大家知道这是一个consumer对吧,弗Li卡夫卡的consumer,然后接下来是中间的处理转换,我们这里边主要是开窗,开窗去做一些聚合啊,Window操作,最后就是think thinkk到还是到卡夫卡对吧,卡夫卡进,卡夫卡出,所以think这边就应该是一个producer啊,那所以这个流程,这是我们之前都已经测试过,完全没有问题的一个处理流程。接下来我们要考虑。状态一致性的时候,端到端状态一致性,诶大家想就是从这里边的数据进来,然后经过数处理转换之后,到这里边写入到这边的卡夫卡里边去,如果发生故障或者发生一些其他情况的话,再做恢复,是不是相当于我这里边所有的数据。
04:14
都必须能够消费的到,不能丢,对吧,就最后一定要处理完了,能能够体现在我最终写入的这个卡夫卡的数据里边,另外是不是还得不能重复写入啊,那就假如说我这里边有一个数据是中间我恢复了之后,可能又处理了一遍,即使又处理又处理一遍,是不是后边也不能又一次写入啊,哎,所以这就是要求我们后边你可能也有一个回滚机制啊,我们说的事务性提交,如果你出现前面那个故障,呃,发生故障啊,之前的那个预提交的作为一个事物,我们就直接撤销就可以了。呃,那么在对于这个checkpoint做检查点的过程当中呢,还涉及到两个组件上面,那就是呃,Job manager,这是我们说协调checkpoint,触发checkpoint的时候就靠他对吧,最终确认也是靠他啊,他相当于是我们这个就是班主任老师嘛,收集大家每个人的那个快照,最后是他拼在一起跟大家确认快照完成的,合照完成的,那另外还有一个就是。
05:17
Stay back stay back在这里起到的作用其实就是。对,大家想它是不是就管理,首先是管我们每一个的本地状态怎么存,怎么怎么读取,另外是不是写入这个checkpoint的时候,保存快照的时候也是靠它来做啊,所以。这就相当于是我们的那个那个那个应该像照片集影影集一样,对吧,啊相册一样啊,那所以主要涉及到的组件就是这些啊,当然默认这个我们在系统里边,呃,就是生产开发环境里边都是这个内存级别的,我们一般情况在生产实际呢,可以把它改成文件系统对吧,F sc end,然后接下来我们就看到了。
06:02
Job manager要发起一次checkpoint的一个操作,那么他会向S任务发起一个通知,那么接下来是不是就在S任务这里相当于插入了一个检查点的分界线barrier,对吧?把这个barrier直接就插进来了,就像一个特殊的数据一样,然后接下来呢?呃,这个barrier是不是就在每一个算子?之间往下传递啊呃,一个一个啊呃,这个传递的具体过程的话,就前面我们讲到的要做barer对齐,这里面我们不考虑分区的情况了,就看这个过程就可以了。那首先第一步source任务,这里边收到barrier的时候,是不是我们的规则就是一旦看到barrier,处理barrier的方式就是保存状态,然后呃,就相当于是做了一个快照,然后通知DR manager就可以了。所以你看这里面的SS任务,它就是把自己的快照,也就自己的状态,也就是偏移亮直接保存到。
07:01
状态后端里面去,然后是不是需要通知job manager,接下来继续把这个barrier朝朝下游传递啊,所以它整体来讲就是这样的一个过程,那它做完这个操作之后,Barra传给window这个操作,那window操作是不是也可以保存自己的状态了啊,它其实就是完全他们都是并行不备的啊,就是SS任务在做保存的时候,Window可能还在做之前的某些计算的啊,然后这个SS任务把这个barer传给他之后呢,Window去做保存,他也不管,他就直接读取后面的数据了。他们就是这样的一个流程,所以接下来每一个内部的transform算子遇到barrier的时候都要保存一次。自己当前的这个状态对吧?啊,把它保存到状态后端,然后通知job manager啊,接下来再把这个barrier继续向下游传递,所以整个所有的中间所有的任务都是这样做的。
08:00
那么最终大家看到这个window任务和S任务都会把这个状态做一个保存,然后接下来SK这边大家想就是我遇到barrier的时候是做保存,那遇到数据的时候是不是接下来就是。正常往这个卡卡里边去写入啊,但大家注意这个写入,这是一个哪个阶段的写入呢?是预提交还是正式提交呢?对这里边是一个预提交,好,所以这里面就会有一个问题,那我这个预提交是要。通过一个事物去提交对不对,诶,因为你后面有可能要撤撤回嘛,撤销,所以说这里边必须要基于一个事物之前要必须打开一个事物,那大家想一下,我什么时候打开这个事物呢。最初的话,可能我们能想到,那你就是一上来之后,对吧,程序刚刚开始这个运行的时候,或者说是第一个数据要去提交的时候,我去打开一个数,打开一个事物,然后接下来基于这个事物去去提交就完事了,对吧?诶那后边呢。
09:04
后边比方说我一个数据一个数据来,所有来的数据都是基于当前那个事物做了一个这个预提交,对吧,这里边有一个trans one啊事物基于这个事物做的这个预体胶。那这里边问题来了,后边如果来了一个barrier的话,Barrier现在要做什么事情?诶,有同学可能说那Barry已经来了嘛,Bar瑞尔来了的话,那我把这个事物就直接就就关闭正式提交了嘛。大家思考一下,是这样的吗?注意,首先think任务,他把这个BA收到barrier的时候,他首先要做的是保存自己状态,这是我们每个任务都必须做的,对吧?保存自己状态,但是大家注意,我收到barrier保存自己状态完成它保存完了之后,这就表示当前checkpoint已经完成了吗?只是当前任务保存完了对吧,他现在我们并行,整个这个架构都是并行处理的,他并不知道其他任务有没有保存完,对不对,根本不清楚。
10:09
那这种情况下,我能直接就把之前的这个transaction事务就直接关闭了吗?不能,对吧,我们当时说这个事物关闭的时间节点是。当前checkpoint的完成是不是必须要等job manager通知啊,对吧?Job manager不通知我自己保存完,这个不叫合照完成对吧?只是自己的这部分完成了而已。所以大家要注意这里边barrier带来的操作,一个是把自己的状态要保存在状态后端,另外还有一个是要干什么呢?诶大家注意,因为啊,当然他还得通知这个job manager对吧,但是他即使他通知了job manager,他也不能关闭之前的事务啊,因为还是job manager那边没确认嘛,所以接下来他是要。要新开启一个新的事物,大家想想这是为什么?
11:03
因为我们说barrier它是分界线,以它为界,是不是之前的数据和之后的数据分别属于不同的checkpoint啊,然后我们说当前做这个两阶段提交的时候,是要把事物跟差point绑定在一起,那是不是他之前和之后的事物数据也都应该属于不同的事物啊,哎,所以大家想我这个Barry已经来了之后接下来的数据。他带来的那个状态改变是不是就不包含在我当前这个checkpoint里边了,那么当前的这个数据应该由这个之前的那个transaction one这个第一个事物直接提交吗?还是提交在这个第一个事物里吗?这就不对了,对吧?啊,因为之前如果我要收到这个这个拆point的结束的这个通知的话,我是不是就相当于把这个正式提交了,这个正式提交如果之后要回滚的话,这里边的所有数据是不是都不会回放了。因为他已经正式提交,这里边状态都已经保保存了嘛,但是大家想后边的这些数据,它的状态改变是不是没有包含包含在当前的checkpoint里边啊,那你就不能把它正式提交对不对。
12:12
所以大家看到这个分界线带来的操作是,我应该先开启下一个事务,然后去进行下一个阶段的预提交。所以大家就注意这个事物是可以同时开启的,对吧。哎,这当前是第一个事物,之前那个还没关,但是第二个呢,已经开了。那大家可能就想到了,那那到底第一个什么时候才关呢?哎,对,大家发现这不是根据我我这儿到底来了什么数据,或者来了什么这个barrier来决定的,是不是要必须等到job manager发通知我才能确认啊,所以大家看到接下来最后一步操作,那是当所有任务都已经向John manager报告,我这边都已经完成了,那John manager因为他那边有那个执行图嘛,他知道一共有多少任务,对吧?那所以他收集齐了之后,他就说一统计好,大家都到齐了,那我跟大家确认,现在我们的合照完成,我这边把它拼在一起了,所以发出这个通知之后,大家看think任务就要做另外的一个操作就是。
13:15
我要把之前的那个事物transaction one,第一个事物是不是现在可以正式提交正式关闭了啊,所以之前写入到卡夫卡里边的数据就可以正式就是改成状态,改成已已确认已提交,那接下来就可以正常消费了。这就是整个这个两阶段提交的一个过程啊,啊,如果大家再来总结一下的话,大家看这个具体的流程,我们梳理一下是什么样的呢?按照这个因为流失数据嘛,我们就按照这个数据一条一条来看它的这个过程啊,首先我们想到第一个数据来了之后,那我这里边。我就只看thinkk了啊,前面就不说了,就是正常的做这个状态的保存,Barry朝下游传递,那现在第一个数据来了之后,诶,大家看现在我是不是既没有事物也没有那个检查点啊,所以我接下来就直接在这儿开启一个事物。
14:14
对吧,Transaction one。直接把对应的数据往里边写。就这样去做,然后接下来所有的数据都往里面写,对吧?啊,就来一个写一个,来一个写一个。然后接下来如果要是来了一个barrier的话啊,那这个是由John manager那边出发的,对吧?Barrier从south开始一个一个朝下游传递,那么S那边保存的是偏移量,中间的transform任务保存的都是自己的状态,到think这边的时候,它也要保存自己的状态,另外是怎么样呢?另外是是不是要开启一个新的对transaction to第二号事物?那么大家就会想到当前的这个barrier,它对应的一号事物,其实对应的应该就是一号checkpoint,对不对,所以大家要注意啊,是一号checkpoint开启的是二号事物对吧?啊,就是一号checkpoint里边所有的数据,它都会在一号事物里边去做最终的确认和提交,而它之后,因为它是分界线嘛,它之后是不是应该是二号checkpoint和二号事物啊,所以是以它作为开启二号事物的一个标志。
15:27
啊,那么呃,整个这个过程,这其实都是预提交对吧?啊,来一个写入一个都是预提交,那接下来如果来来了后面的数据,大家看是不是他就直接往这个二号事物里面写就可以了,所以他做以这个barrier作为分界,是不是就相当于之前的TRANSACTION1,尽管没有正式提交。但是他已经不再接收数据了啊,他其实可以作为他的这个数据的一个完结啊,啊,那什么时候他正式提交呢?那就必须得等到draw manager发通知的时候,对吧?这个跟这里的这个数据流和barrier就没任何关系了啊,那个没准嘛,我并不知道他什么时候发,只要那边完成一发通知,我这边就正式把这个trans trans one做一个关闭正式提交就可以了,对吧?啊这就是整个的一个处理流程,呃,那么大家这里会发现,就是对于真正意义上我们在项目当中想要做到这样的一个端到端状态一致性的话啊,大家看还有一些其他的配置要求就是啊,首先就是外外外援卡夫卡这边我们要要要打开那个事物对吧,能够能够接收事物啊,然后还有一个就是之前我们讲到有那个超时时间的配置,你不能说是就是我外边这个卡夫卡的事物超时时间很短,而我前面那个拆po是不是也有一个。
16:47
超时时间啊,那我checkpoint那边当前还没超时,我还在还在拼命的保存,还在做呢,结果我外边那个事物都已经超时关闭了,那大家想最后我这边checkpoint的保存正常的保存完成,结果外边这个事物是不是没提交啊,诶那这最后这个数据就丢了嘛,所以还需要有这样一个配置的匹配,那最后还有一个一个点,那就是大家注意前面我这里边预提交的时候,是不是相当于这个数据已经写入灌到这个卡夫卡里面来了。
17:18
那理论上来讲,我是不是其实外部系统如果消费这个卡夫卡里边这个topic的数据的话,其实已经可以读得到啊,啊,所以这里边我们为了保证外边还不能直接用这里边的数据,我可以做一个限制就是。呃,卡卡那边我可以设置这个消费者的隔离级别对吧?呃,就是所谓的这个I level啊,我把它设置成read committed的,因为大家知道默认是read on committed的,对吧?未确认的信息只要进来我也我都可以消费,都可以读取,那我现在呢,不让它读取,就只有在我等到那个checkpoint完成,收到manager manager通知我确认正式提交之后,才把对应的那个隔离级别的那个标志,呃,就那个标志啊,改成那个committee的,那接下来是不是才可以正式消费啊,那这样就保证在下游的那个环节也不会提前消费这个数据,导致消费两次对吧?啊,这个就是后续的一些一些考察和配置项啊,在实际生产当中还是要考虑很多的。
18:22
这就是关于弗林跟卡夫卡连接在一起两阶段提交保证结果的精确一次。
我来说两句