00:00
上节课我们已经提到了这个端到端的状态一致性,那接下来我们就来看一看在flink里边。它怎么样去实现这样的端导端状态与执系,我们说了整个从source,从外部读入数据到flink内部处理,然后再到最后think写入到外部系统里面去,所有的流程,每个环节都得保证状态一致,对吧?哎,所以我们要的是这个端到端这个过程,那这个怎么保证呢?那我们分别来看内部怎么保证,对内部非常简单,我们说了嘛,Checkpoint对不对?因为内部它只有一个状态,并看不到这个数据的一些信息,也不保存数据,所以说只要有checkpoint,它能恢复到当时某个数据完成时的那个快照的状态,那就没有问题。所以有checkpoint就能保证内部flink内部处理的exactly once的状态一致性。
01:00
然后接下来我们再看,那还有S端对吧,跟外部的数据源连接的这一块,怎么样去保证exist one呢?诶,这里边我们要保证的关键其实就是它的偏移量得能重置,得能把数据能重放对不对,没有处理完成的那些数据能够重放出来,不能丢数据啊,那当然这一块大家已经知道,对于这个卡夫卡而言,显然我们只要能够。去提交一个偏移量是不是就可以了,对吧,恢复到当时的那个偏移亮的状态就可以了,所以这一块看起来也不是什么问题,比较麻烦的是。大家看到前面有了这个内部的checkpoint的保证,然后再加上S这一端可以重新提交偏移量,可以重新读取数据,那是不是就已经保证我们的数据肯定不会丢了,这个时候我们的状态一致性已经能够达到什么级别了?
02:02
是不是已经能够达到至少一次了,对吧?At least once,对吧,就至少能够让这个数据处理一次,但是我们如果要想达到exactly one的话,那这个S这一段是不是就至关重要啊?前面我们已经说了,在这个数据重放的过程当中,Flink内部你怎么去跑,怎么去重放,其实都没太大关系,因为我只保存状态嘛,对吧?你那个数据其实在过程当中其实是不重要的,但是一旦要是已经把这个数据写入到外部系统的话。那这个就没有办法撤销了,而且你从外部整个来看的话,别人已经能够看到一个输出结果了,这显然就不能叫exactly one了,对不对?这显然就是重复输出了,所以那那这种要保证think端的exactly one,我们又用什么样的机制来保证呢?
03:02
这里边啊,大家会想到我们这里边关键就是对think有要求,Think这一端要保证exciting one有两种基本的方法,一种是所谓的幂等写入。另外一种是事物写入,好,接下来我们就一个一个给大家讲啊,呃,首先是这个密等写入,呃,这个英文是em put rights,那么所谓的密等操作,这是什么意思呢?幂等啊,大家知道幂是那个乘方的意思,对不对啊,幂等这个意思其实就是说重复执行很多次,但是最后呢,结果都一样,结果最后只改变一次,哎,所以这样的操作就叫密等操作,也就是说第一次这个操作来了之后,对他改变是有效的,后面再来重复这个操作,怎么改它,它都都不起作用了,对不对,改不了它了。
04:01
这样的就是幂等操作,呃,大家给大家举两个例子吧,光这么说概念可能有点奇怪啊,大家看什么样的东西是幂等的呢?很简单的一个例子,数学上的例子啊,呃,但大家还记得那个E的X这样一个函数吗?这个函数如果要求导它求出来是什么?是不是还是自己还是ex啊,所以求导这个操作,如果你连续对他做的话。它是不是求多少次还是ex啊,哎,所以大家就就会发现求导这个操作对于ex这个函数是不是就相当于是一个幂等操作啊啊,所以这是这样的一个数学上的一个一个事例啊啊,那大家如果要是觉得数学已经好像忘的差不多了啊,那我们还是看一个这个计算机领域啊,数据数据呃,数据处理领域的一个具体的例子什么呢。比方说哈希map对吧,呃,大家想想哈希map这样的一个插入操作,它是一个键值,对。
05:04
那所以哈希ma在插入的时候,是不是要根据K的那个哈希值去找它对应的位置啊,那假如说我们要重复插入一个K8队的话。那只要K一样,是不是永远找到的位置是一样的啊?然后找到同样的位置,再写入同样的value,是不是还是一样的结果啊,所以对于我们这个写入插入操作而言,这个哈希map。是不是就相当于对于哈希map而言,插入写入操作就相当于是一个密等操作,对吧?啊,这是大家比较直观的能够想到的这样的密等操作。呃,这里大家注意啊,对于这个密等写入而言,整体上来讲,它确实看起来最后重复写入结果是一样的,没有起什么作用,但是实际上它的这个原理相当于是什么呢?原理,它不是说不去重复写入,而是呃,你去写吧,重复写入写完了之后结果还一样,它是这样一个原理,所以它有可能会遇到一个什么问题呢?
06:17
他有可能遇到一个就是我们出现这个故障,然后进行恢复状态的时候,大家想到假如说我们后边的那个写入到think系统里边,用了这样的一个幂等写入的操作的话,那可能会出现什么情况?那就是会发现突然我们把外部系统的那个写入操作,跳回到了之前的那个状态,对不对,跳回到之前的某个状态,然后。然后他继续往里边写。然后就好像在历史重演一样,把之前我们那个就是故障之前的那一段,那一段时间发生的那个状态变化又重新重演了一遍,最后直到发生故障的那一刻的那个状态追上,然后之后就完全正常了,对吧?大家想想是不是有这样的一个状态出现,当然你如果从外部系统去读的话,你假如说没看到中间这一小段历史重演的过程,那可能看起来还是还是一样的,对不对?但是假如说那个中间这一段数据又被人读出去做了一些操作的话,那是不是相当于还是会有一些问题啊?
07:29
对吧?所以从严格意义上来讲,密等写入其实还是有一些小瑕疵的,呃,就是嗯,不能做到完全意义上的exactly one。好,那除了这个密等写入之外,我们再来看另外一种,就是所谓的事物性的写入啊,这个大家就比较熟悉了,对吧?Transactional rights,那么事物transaction,这是一个什么东西呢?啊,事物其实是应用程序里边一系列严密的操作,那么所有的这些操作它是相当于是捆绑在一起的,要不全部成功完成,要不只要有一个挂掉是不是?哎,整个这些全取消啊,对吧?呃,就所有的全部撤销,这个大家之前应该也比较熟悉,就是说呃,我们所谓的这个事物的几个基本要素嘛,Acid,对吧?这个大家学这个数据库理论的时候应该都学过啊,所以我们知道这个事物是具有原子性的,对它的一系列操作,要不成功,要不呃失败,要不就一个都不做,对吧,一组操作是一起同生共死,那这样的一个状态,呃,其实大家知道这个事物,这个单词transaction。
08:41
啊,另外还有一个含义是交易啊,所以大家自然能够理解,就是说为什么事物就是又又是交易呢?因为交易就是这样的特点,对不对啊,你比方说我们这个银行转账,那那这个银行处理的过程显然就是说对你你要给我转账的话,是不是你的账户里边要先减掉一个数,然后我的账户里边加上一个数,对吧,这样我们账就平了,那假如说你在给我转账的过程当中,你那边捡这个操作已经发生了,然后系系统挂掉了。
09:17
然后恢复状态的时候,是不是恢复出来之后,就是你那边减了,我这边没加上啊,平白无故少了钱了,这显然是不行的啊,所以说就是你那边减和我这边加这两个操作必须捆绑在一起,要不都成功,要不都失败,对吧,要不你就回退到你没减的那个状态,这是所谓的这个事务性操作,那大家想一想,我们如果用在这个flink,这个端到端的状态一致性保证里边。怎么样用这个事物写入来保证它的状态一致性呢?简单的一个思想,其实就是说我们现在在他之前的这个,就是在think环节之前的状态一致性保证是靠checkpoint,对吧,然后故障之后的恢复也是靠checkpoint,所以我们的一个想法就是我们要构建一个事物,它要跟之前的那个checkpoint要捆绑在一起。
10:17
言,言下之意就是什么呢?就是说如果checkpoint真正完成了,前面那些数据都处理完成了,我才把真正完成的那些数据真正写到那个对应的think系统里面去,所有一个checkpoint里边的数据,他们都属于同一个事物。同一个think提交的事务,这样是不是就完就做到了,我们整个这个过程要不都成功,要不都失败啊,所以是这样的一个想法,具体呢,事物性写入又有两种实现方式啊,大家看这个概念就越来越多啊,呃,哪两种实验方式呢?一种叫预写日志,另外一种叫两阶段提交,呃,其实这些概念都是在这个,呃分布式系统里边,或者说我们在做这个事务性一致性处理里边,非常。
11:12
可以说是非常有名的一些概念啊,那接下来我们看一下这个预写日志是什么意思。这大家之前听说过吗?听过是吧?Wal对不对?呃,Write hand log,所以它是比较简单的,它主要是干什么事呢?其实就是把结果数据是不是先存起来啊,先当成log保存起来,对吧?先存一份,我们这里边呢,呃,在flink里边就可以把它先当成状态保存起来,然后干什么事情呢?当收到checkpoint完成的通知的时候,呃,大家知道当所有算子任务把那个自己的状态都存完了之后,不是要通知job manager去,呃,说诶,我们都已经完成了吗?那job manager收到所有的确认信息的时候,他就说啊,好,我们现在这个checkpoint真正完成对吧?有这样的一个过程,哎,所以。
12:06
当job manager说真正完成的时候,然后他要通知一下我们的这个think,那么这个时候think再一次性的把我们缓存的那些状态全部写入到think系统里面去啊,所以大家看这过程就其实跟事物的那种那种,就是我们所谓的这个事物就是一回事,对吧?啊,这就是事物的一种实现嘛,啊,它的特点其实就是。简单粗暴对吧,大家想这是不是就相当于这个批处理啊,是不是这样对吧,就是一批直接先存着对吧?呃,放在这个love里面,放在状态里边,然后到时候说哎,那边已经存好了,H1批写进去,呃,所以这个其实是非常简单呃,容易实现的,而且就是说由于这个数据是提前在状态后端里边去做缓存的,写在lob里边的,所以不管你后面连接的是什么样的think系统,那这种方式是不是都能一批搞定啊,对吧?不管你连的是什么,反正我就先缓存着吧,你那边支持不持不支持事物我不管,我就一直等着,我只等checkpoint那边一完成,我一批给你写进去。
13:20
但是大家想一想,这个有没有什么问题呢?写生对大家会想到你在这个这个具体的过程当中,假如说你这一批往里边写的时候。这个过程当中挂了。那会出现什么情况呢?因为这个sink系统,你这里面没有要求它一定支持事物,对吧,所以是不是就有可能出现我在写的过程当中,一部分写进去了,一部分没写进去。诶,那那大家就想你这个时候他到底是算写成功还是写失败呢。一部分没写进去,那当然应该失败,对不对?按照我们的想法应该是算失败,那如果要是算失败的话。
14:08
那是不是如果要是再返回去,从上一个checkpoint再重新做一遍的话,是不是所有这些数据,这批数据又得重新写一遍啊,那之前已经写成功的那一部分是不是就得写两遍啊?啊,那那可能又有同学说,那要不这个就就算他失败了呗,呃,就就算他成功写成功了呗,对,那大家想,那如果你这个算写成功了,那边拆矿它也完了,那是不是这边没写进去的这一部分就失败了,那就真的丢了,对吧?那你这连at least ones都做不到了,那显然更不好,对吧?所以这里边其实是有问题的啊,所以在这个淤血日志,这个w al的这个实现过程当中,呃,其实是会有这样的一个问题,另外它其实还有一个小问题,什么小问题呢?大家会想到我,我本来前面是说这个checkpoint已经做完了,对吧。
15:00
已经做完了,然后这边才去真正的写入,但其实真正这个整个过程完成应该是算什么呀,是不是得等那边think这边一批全写完,我那个checkpoint它才能真正生效啊,所以wal的这个机制,它的其实实现的过程,它是要相当于这个checkpoint前面的那个job manager确定他写完还不算真正写完。就是他只是说我这边该做的都做完了,还没有真正确认啊,所以它相当于还得有一个外部系统里边存着我当前已经确认的,已经完成的那个checkpoint,所以说什么时候去确认这个信息呢。哎,那就得比方说我现在已经把所有的这个check的要做的操作都做完了,状态都存好了,然后通知think think任务,那就会把这一批数据全部写到外部系统当中去,写到外部系统也写成功了,这是不是可以拿到一个返回啊,成功的返回对吧?然后think系统就可以告诉job manager,我这边也最后搞定了。
16:11
那这个时候job manager会再做一个操作。就是相当于所谓的这个要把这个checkpoint做,做一次commit,用一个commit做一次commit,提交到另外的一个永,就是持久化存储的一个呃空间里面去,就是表示我当前这个checkpoint是连最后的那一批写入都已经搞定了,Check矿到时候要恢复,你从这个来恢复。那大家想,这个过程还有破绽吗?还有什么问题吗?当这个过程都已经确认了之后,那相当于呃,我们这个wal缓存的那些数据是不是就都可以删除掉了啊,这个就没问题了,那边已经确认了吧,那大家想一想这个过程有没有有没有什么问题。
17:03
这个过程整体来讲看起来还是没什么问题的,但但是大家想到这里面有一个环节,就是我们最后是不是要写一个确认信息,表示我当前的这个checkpoint真正的已经提交完成了。对,那假如说这个这一次写入的时候出现故障挂了呢。那会出现什么情况?大家想一想,如果这一次挂掉的话怎么办?它默认没有,那是不是相当于就是相当于我们这里边就没有完成啊,对吧,认为checkpoint没有完成对不对,但事实上这个时候checkpoint是不是该做的事情都已经做完了,甚至连最后写到外部系统都已经做完了,对不对?你这个时候如果认为他没有完成,要重新来一遍的话,那是不是就有问题了,对吧?啊,这个时候就肯定会重新写入了,所以大家注意啊,就是这个wal的这个方式也是有这样的两种小问题的啊,这就导致他最终其实是根本达不到exactly one的这个语义实现。
18:23
呃,那当然了,在这个flink里边DS stream API提供了一个模板类,可以方便我们来实现这种实物性的SK,呃,它还是比较有用的,因为为什么呢?因为它比较简单嘛,对吧,不管是什么样的系统其实都可以,呃,用这种方式直接做一个批处理,把它写进去,这个模板类叫做genetic right head SK,对吧?啊,所以大家看到就是如果我们想去自定义的话,那就实现这个玩意儿就可以了。呃,那当然我们就会想到,前面讲了这么多东西,好像都没一个特别给力,能够真正一点问题都没有做到一个大。
19:04
有没有能够做到的呢?哎,这就是最后我们要给大家讲的这个所谓的两阶段提交,这是真正能够实现exactly one的一种方式,呃,那顾名思义啊,这个两阶段提交啥意思啊?它是two face commit,有时候大家会看到这样一个缩写叫to PC 2PC对吧,有时候中这个中文直接把它叫2PC啊,甚至还有这个三阶段提交叫3PC啊,这个大家就是看到名这个名词缩写,知道它什么意思就可以,这个是顾名思义,它是不是要把这个提交的过程分成两个阶段两个部分来做啊,哎,整体上来讲的话,他其实就是先做。预提交先做一次,就是准备的提交,然后呢,等到checkpoint真正完成的时候,再做一个正式提交,这样的一个提交过程呢,它其实是需要外部的think系统要支持事务。
20:06
所以它其实是对外部系统要有更高的要求,大家看一下这个具体的这些过程啊,具体怎么做这个两阶段提交呢?诶对于每一个checkpoint think任务呢,它会启动一个事务。就是来一个checkpoint的操作,它就相当于对应的就启动一个新的事物,然后接下来这个checkpoint之后,所有的数据那就会添加到这个新的事物里边,也就是说在这个checkpoint之前的所有的数据是不是应该属于上一个事物啊,所以大家看这个是不是就是事物和checkpoint的这个节点完全对应上啊,完全在这个点上对应上。然后大家看啊,它这个checkpoint之后的所有的这个数据添加到新事物里边去之后,会把他们挨个来,一个就写一个直接写入到web think think系统里面去,就是通过这个事物写入,对吧,在事物里边去把它提交写入,但是并没有直接真正的提交他们。
21:13
这是就相当于只是一个预提交,哎,这是什么意思呢。这就相当于是说我把这个数据已经发给外部的那个think系统了,但是这个系统还这个事件,这个这个数据还不可用。现在的这个事误还没关闭,对不对,该做的那一组操作还没全做完,所以你那边那个数还不能用,但是已经发给你了,这是这个过程啊,那什么时候他就可以用了呢?当think任务收到checkpoint完成通知的时候,才正式提交事务,然后实现结果的真正写入,那事物关闭的时候,那那边的数据就真正可用了,对不对,事物关闭整个提交啊,是这样的一个过程。那这种方式呢,就真正实现了所谓的exactly one了啊,他对外部是有要求,要求外部必须得支持15。
22:06
呃,那同样link里边呢,提供了一个接口,可以让我们自定义去实现这样的一个东西,这个接口叫to face commit s function,大家看这是一个think function,对不对啊,这就是这样的一个两阶段提交的think function,如果实现这个类,我们就可以做到两阶段提交的这个语义了。好,那这里边再给大家说一下,这个所谓的两阶段提交对外部系统到底有什么样的要求,前面我们已经说了,外部系统必须得提供事务支持,对不对?必须得能够能能够打开一个事物,然后你在这个事务里边提交数据的时候,外面不能直接消费啊,必须等这个事物关闭的时候统一提交,这个时候才能用啊,这个是对外部系统的一个要求,然后另外还要求什么呢?还要求在checkpoint的间隔期间内,就是两个checkpoint,前一个和后一个。这个checkpoint中间。
23:06
需要能够开一个事物对吧,这个时候打开一个事物,然后把这中间所有来的数据全部收到这个事物里边去做写入。啊,这是他对这个事物的要求。然后还有要求是在收到checkpoint完成通知之前,事物是不是必须得是一个等待提交或者是预提交的一个状态啊,对吧,这个过程当中必须它不能是直接提交了,那么在故障恢复的情况下,大家就会想到这可能就会就会需要一些时间了,对不对,对吧,你从checkpoint里边去恢复这些状态,然后这个事物它是一个等待提交的状态,那这个时候think系统它的那个事物肯定也是有超时的呀。假如这个时候think系统关闭了。他认为你这个超时了,当前这个事物作废关闭,那么这个时候会出现什么情况?
24:08
那是不是未提交的数据就会直接丢失掉啊?对吧,所以大家注意一下啊,这里边就是对这个超时时间还有要求啊,然后另外要求就是说thinkink任务必须能够在进程失败之后还能够恢复事务,最后提交事务必须还得是密等操作,哎哟大家看这个这么多要求,大家就会想到,哎哟,你要这么说的话,这个两阶段,两阶段提交这个协议是不是实现起来有点太昂贵了呀,这个代价是不是有点太高了。值得吗?啊,当然了,就是说在有一些场合下,大家如果觉得状态一致性要求没那么高的话啊,那可能你可以牺牲这个状态一致性去,呃,做的更简单一些,对吧?啊,可以那个让系统性能更好一些,让这个延迟更低一些啊而从另外一个角度说呢,如果在非常有必要保证exciting one这样这样这个状态一致性级别的时候,那其实这个两阶段提交还是非常有必要的。
25:17
尽管它昂贵,但是其实在flink的上下文里边啊,每一个checkpoint这个协议其实是指。进行一次的,对吧,它并不是说一个数据来了之后就要去来这么一次协议,对不对,它其实就是跟checkpoint checkpoint绑定在一起,有一个checkpoint,它就开一个事物,它是这样的一个过程,所以整体来讲,它其实很多开销是捆绑在check上的。其实开销相对来讲还是比较小的。啊,尽管是一些额外的开销,但其实是我们可以接受的啊,利用这些开销增加了比较少的开销,但是实现了真正的exactly one其实还是值得的啊,那大家可以把它跟前面讲的那个wal预写日志的方式做一个对比,那他俩都是事务性的提交嘛,那它俩的区别在哪里呢?
26:15
大家会想到前面那个预写日志的时候,是把所有的数据是不是先缓存起来了呀,然后最后做批处理,一批写入对不对?那大家看这里边两阶段提交,他是批处理吗?其实不是,他提交的时候,其实前面预提交还是一个一个就全提交过去了,对不对?只不过什么最后是一批直接操作的呢?其实不是一批提交,是因为他们全在一个事物里边,我最后确认那个事物关闭的时候,是不是相当于是一下子确认的呀,然后一下就把它们相当于都标记成可用状态,都标记成提交状态了。所以在这个过程当中,其实更符合流处理思路的是这个两阶段提交的方式,而且大家会想到在前面这个wal预写日志的方式里边,对内存的要求是不会比较大,在这个过程当中,是不是所有数据你必须全缓存下来,而在这个两阶段提交里边需要缓存数据吗?不需要,对吧?诶,我直接就提到外部系统里面去了吗?来一个就直接提了,只不过那边它是一个在一个事物里边还没有正式提交的状态而已。所以这个过程其实对于系统性能而言,两阶段提交显然是会更好的,对吧?好,然后。
27:36
接下来给大家看一看这张表吧,大家看一下这个到目前为止我们已经看了这么多种方式,对吧?呃,大家来比较一下不同的S和不同的think啊,他们这个状态下对整个系统的状态一致性的保证是能够达到什么样的级别,大家总结一下。首先大家看,如果说这里就是分别看source和think,因为内部的话我们都开checkpoint对吧?Exactly one肯定是能保证的,这个就就不说了,他不会不会拉我们的后腿,我们主要就看S和S,这里边S分两种。
28:15
如果S不可重置的话。言下之意是什么啊,对,言下之意就是是不是相当于这个树来了之后就来了之后你你能处理就处理,如果要是中间挂了的话,丢了就丢了,没有回头路了,对吧?啊,这就像我们那个骚的那种发发送的方式一样,不可能再重新读取了,对不对,那如果是不可重置状态下,那就没什么好说的了,那他就完全没有保障,数据有可能丢,他提供的一致性保证就是对最多一次at most one。那如果要是SS是可重置,可重置状态下呢。至少啊,大家有想到他这是不是至少保证数据不会丢啊,所以它至少就能保证at least once至少一次的。
29:07
一致性语义啊,那接下来大家再看一下对应的think,这里边如果你什么都不做,Think啥都不保证的话,那是不是这里就是at least once啊,啊对吧,这肯定是至少一次啊,那如果这里边我们给了一个幂等的think操作的话。S还是可重置的,那我们整个整个流程,整个端到端应该是一个什么样的这个一致性级别。对,其实是exactly one,为什么呢?因为首先数据不会丢,其次数据是不是你多写几次,它这个think系统也都一样啊,对吧,结果都一样,没什么区别,但是它有可能出现一点小问题,什么小问题呢。就是在故障做这个故障恢复的时候,有可能短暂的有一点那个历史重演的感觉,对不对,会出现短暂的状态不一致啊,所以这是他的一点小问题啊,那另外还有两种方式,事务性提交的两种方式啊,一个叫预写日志wal,那么在可重置S的前提下,它能达到什么样的一致性级别呢?
30:19
哎,我们前面说了,它只能达到at least once,因为它是缓存了一批,然后一下子去写入的,对不对?呃,一下子写入这个一批去写入这个状态显然就不靠谱,假如这个写入的过程当中,中间挂了的话,那明显他这个就保证不了这个,呃,就是我我们当然最后的选择是如果中间挂了,我们应该算他挂了对吧?呃,我宁愿让他按算算他挂了,也不能算他成功,因为算它成功这个这个一致性是不是就变成atmo once了呀,对,所以说这里边我们肯定是这样来算啊,啊,但这种情况下他就达不了,达不到once。
31:00
那接下来最后的这个两阶段提交,它是不是就能真正的达到exactly one啊啊,这就是我们给大家讲的这个。端到端的状态一致性。
我来说两句