00:00
另外的一种方式,大家想到了这个幂等写入,它中间会有短暂的不一致,那有没有更好的实现方式呢?啊,而且就是大家其实想到就是幂等写入不光是中间有短暂不一致啊,而且是不是对外部的这个数据库,这个存储的系统也有要求啊,它得保证我这个重复写入是没问题的,是吧?你像key value这种方式啊,所以它是有限制的,那我们就想到有没有更好的实现呢?哎,那就是后面这种方式,所谓的事物写入transactional rights。事物的话,这个大家非常熟悉了啊,数据库里边的一个经典概念,呃,大家知道这个数据库里边有这个所谓的acid特性,对不对啊,里面非常重要的一个特点就是有原子性,原子性的特点就是一个事物里面的一系列操作,它就是把一系列操作要相当于绑定在一起,要不就全部成功,要不就一个都不做,对吧?就如果中间有一个失败,是不是所有的这个操作全部要回滚啊,之前就全部撤销掉了啊,就一个典型的例子就是我们转账,大家知道转账这个操作的话,现在这个电子系统里边银行数据库其实就是非常简单的两个操作,是不是你给别人转钱,转100块钱,是不是你的账户要减100,别人的账户加100啊,诶,但是大家想到,假如说你出现故障呢,我转100的时候,我这边减掉100了,结果这会掉电了,直接挂了,那大家想是不是我这边减了100,那边没加上100啊,那那这个当然就不对了,对吧,凭凭空的就少了100块钱嘛,所以。
01:30
为了保证结果正确,是不是一定要把它包成一个事物啊,对吧,就是你这个减100和加100这两个操作要不都成功啊,那我们这个账是平的,要不就是诶全部取取消对吧,当前交易失败,你重新提交就完了。所以这是大家比较熟悉的这个概念,那我们现在怎么样把这个向外部写入的过程当中,用这个事物写入的方式做一个提交呢?呃,简单来讲的话,这里边的思想是我们要构建一个事物。
02:02
呃,就是外部系统,我们跟外部系统那边连接,要构建一个事物,提交的话,是基于一个事物去提交,对吧,然后这个事物呢,是要对应checkpoint。大家想一下,为什么要对应checkpoint?就是因为如果一个checkpoint的保存已经完成的时候,是不是当前所有的数据,它那个状态就一一定能够在这个我们保存的检查点里边体现出来了,那之后是不是就不用重放这个数据了,那这个数据是不是就可以正式的写入到外部系统了,哎,所以大家看这是这是一串的对吧?Checkpoint表示内部状态它已经存进去了,而写入到外部系统的话,这是不是表示啊,就是我我只写一次嘛,那就表示现在真的可以写了,对吧?那如果说假如出现故障的话,我回滚到回滚到这个上一个checkpoint,那大家会想到是不是就是这里边的所有的数据状态都已经体现在我们当前这个恢复的状态里了,然后呢,它也都已经写入到外部系统了,对吧?啊,那要是说如果当前checkpoint还没保存完这些数据,是不是对应的这个事件是不是就不能提交啊,大家想想是不是这样啊?
03:16
啊,对吧,事故就不能关闭,那假如说当前这个拆point还没完成的挂了,那怎么办,事故是不是也要撤销啊,那对吧,就相当于这些数据我不能提交了,因为一旦要是回滚的话,这些数据的状态是不是就没有啊,那我得重重放数据嘛,重放数据我不想重重新写入,那当然那个之前提交的那个事物我也要提把它撤回来撤销啊,所以这个思想其实非常非常直观啊,非常简单的,那他具体的实现方式呢,又有两种,一种叫做预写日志,另外一种叫两阶段提交啊,这就是非常重要的概念啊,大家需要把这个做一个了解,首先是预写日志,预写日志其实它的缩写还挺有名的,叫wal,它就是write ahead log,顾名思义,简单来讲就是说啊,我要写入到外部系统的时候呢,我预先先把它保存一下,当成日志先把它存起来,然后怎么样呢,等到收到checkpoint完成通知的时候,一批。
04:16
直接把它刷到外部S系统里面去。大家想这种方式是不是特别简单粗暴啊?呃,它的好处就在于非常容易实现,为什么呢?因为你想不管外部系统是什么样的,之前我们还想我还要构建事物,哎,构建什么事物啊,我一批写入,你说这算不算一个事物,好像也算一个事物是吧?啊,所以就是我直接就把它一批都写进去,我的写入时间点就是拆point的完成,那所以就是说拆point完成了,内部状态都已经写进去了,数据都已经处理完了,对吧?下一次来的时候这个数据不会再重放了,这个时候是不是我就直接把这个数据全刷进去就完事了。然后如果要是说拆po还没完成的时候,这个数据是不是我也就不存啊,我就等着对吧?哎,所以假如拆point你的失败了,那刚好这个数据我也没存嘛,那就重新重放出去,重新回滚就完了,所以这个使用实现起来是非常简单的,对外部系统也没有要求,你甚至往往那个文件系统写入是不是也可以这样一批刷进去完事了。
05:18
呃,那大家想一下它有没有什么缺点呢?对,大家想最大的一个缺点就是你这不就是批处理吗?那家想呢,我之前都是来一个要处理一个结果,到最后写入的时候,结果不让我正常写入了,是吧,我必须要等一批,尽管那个checkpoint的时间间隔我可以设设设设设的小一点几百毫秒,但这也是一个等待啊,你又变成这个一批的写入了,所以这个显然是呃,会增大我们系统的延迟啊,然后另外还有一点是。大家想一下,那假如说我们在做这个操作的时候,一批写入的时候。外部系统你不知道它是怎么写入对吧,那假如说他一批写入是有可能写入一半,另外一半没写入呢。
06:01
你说在这种情况下,那不就又会出现你最后为了保证数据不丢,是不是还要重放啊,那重放是不是就导致写入多次了,哎,所以大家会发现他最后其实还是不能严格意义上,呃,就是实现这个所谓的精确一次啊,那么在flink里边data stream API给我们提供了一个模板类,叫做generic right hand think实现了这个模板类啊,大家如果实现这个接口的话,我们就可以实现了这种预写日志方式的S,对吧,就有了这样的一个状态一致性保证啊。那另外还有就是两阶段提交,大家发现前面这个预写日志不太满意是吧,方式稍微的有一点不完美啊,那这里边我们就用一个两阶段提交来实现,它的缩写是2PC two PC对吧,Two face commit,它的思路是。这就要真正的要创建事物了,就是外部系统必须要支持事物了,它的特点是每一个checkpoint呢,都会对应着启动一个事物,然后接下来所有来的当前这个checkpoint内啊,当前所有的数据,它的写入,Think操作都是基于这个事物去做提交的。
07:13
诶,大家想这个基于事务去做提交的时候,这个算正式提交吗。算正式写入吗?他是写到外部的这个系统里了,但是其实还没正式提交,因为他有可能是不会被撤销啊,这个事物有可能被撤销对不对啊,所以大家看它的特点就是这样,我还是来一个就写一个,直接就写进去了,但是基于一个事物的,这个事物有可能之后还会回滚,还会撤回来啊,然后接下来什么时候才,所以大家看前面的这个写入,其实就是一个预提交,对吧,所谓的两阶段就是首先先预提交,然后后面正式提交,第二阶段正式提交,什么时候正式提交呢?是不是收到checkpoint完成通知的时候,既然你的状态都已经保存了,我现在这个事物是不是也可以关闭了,诶,我就正式提交确认一次正式提交,接下来数据就可以正式消费了,对吧?啊,所以大家看这个过程就真正的实现了exactly once,如果发生故障的话,那我是不是直接回滚到之前的那个,上一次的那个,呃。
08:14
保存的检查点,那对应的他的那个事物是不是也已经提交过了,都已经写入了,对吧?这个没问题,那如果中间没有保存检查点的那些数据呢?他的那个事故是不是要撤回啊,它也就没有写入,重放就可以了,对不对啊,所以这个过程是完全没有问题的。同样在这个弗link系统里边也给我们提供了一个接口,这个接口叫做to face commit think function,就是所谓的两阶段提交think function这样的一个接口啊,这个接口呢,其实在源码里边大家也可以看到啊,之前我们在做这个flink卡夫卡的连接的时候,大家其实是能看到这个效果的。就这里边,呃,我们做这个s test的时候,当时是你有了一个flink卡夫卡PRODUCER011对吧?诶,那这个东西大家还记得里边是什么吗?
09:05
它本身。上面我们看它的这个类啊。这个类。是不是继承字?To face commit think function啊,哎,这就是我们所说的啊,就实现了这样一个东西,当然这本身是一个本,本身是一个抽象类对吧?啊,所以它是一个继承的关系啊,然后它实现的接口是checkpoint的方式和checkpoint checkpoint listener对吧,它实现的是这两个接口,然后在这个to face commit think方式里边,大家会看到有很多自己需要去实现的方法,比如说你看begin transaction开启一个事物对吧?啊,比方说你看有commit,还有这个正式提交对吧?还有有正式提交,当然有预提交,预提交是什么pre对吧?啊,就这里边该有的这个方法都有啊。呃,然后大家也知道这个think function里边有一个,呃,最基本的必须要实现的一个方法,是不是叫invoke啊,这个也有对吧?所以大家看这里边的这个方法特别的多啊,感兴趣的同学也可以了解一下,所以整体思路就是把当前的这个事物的提交。
10:14
嗯,Checkpoint的完成绑定在一起了,这样的话就实现了exactly once精确一次,呃精呃精确呃一次的状态一致性保证啊,那么然后这里面我们给大家总结一下这个两阶段提交,既然说这个两阶段提交这么好是吧,那是不是我们就干脆呃所有的地方都直接用这两卷提交呢?没那么简单,因为大家想首先你看到那个实要实现那个接口啊,就是想要去继承那个抽象类,其实不容易,里边要重写的方法很多很多,对吧?呃,你要构建事务,要确定这个预提交操作,正式提交操作,各种各样的定义啊,然后对外部系统其实也有要求,大家想到首先是不是必须得支持事务啊,对吧,你外边根本就不支持事务提交,那你白瞎嘛,对吧,你像这个文件系统,你你说直接要去两阶段提交,没办法。
11:07
啊,那另外还有就是说,呃,在checkpoint的间隔期间内,两个checkpoint之间,我们是不是相当于必须要能开启一个事务,这这主要是说我们这个呃,Thinkin这个任务的啊,他要开启一个事物,然后接下来来的所有的数据,是不是就相当于是用这个事物来做提交的,对吧?然后那个外部系统就是也是用这个事物来接收对应的数据啊,这是这样的一个特点,还有要求就是说checkpoint在完成通知之前,这个失误呢,必须得是一个等待提交或者是一个预提交的状态,那大家知道这种状态下是不是相当于数据还不能直接消费啊。对吧,这里边其实不能不不能做的啊,所以外部系统必须要对这个做一个限制,就比方说像卡夫卡里边是不是有对应的那个隔离级别的限制啊,对吧,比方说这个read committee的对吧?啊,就是要只能读取这个已已确认已提交的这个数据,你不能读取这个未提交的数据啊,那另外还有就是说故障恢复的时候呢?呃,在这个等待接收拆point的完成通知的时候,可能要花一点时间对不对,就这个预提交时间有可能很长,对吧,事物一直要开着,大家想如果要是说达到一定的时间限度的话,是不是就会超时直接就关闭失误啊。
12:27
哎,所以大家要注意,在这个think任务里边,它是有一个失误的超时时间的,我们的checkpoint是不是也有一个超时时间啊?呃,那另外还有就是外部系统think也有一个超超时时间啊,假如说这三个超时时间不匹配的话,会出现一个什么问题呢?大家想就是,呃,有可能就是说,呃,我们那个外部的那个数据啊,我那边还开着那个,呃,事物在在等待提交呢,呃,然后接下来这个checkpoint也在继续做,对吧,Checkpoint还在继续完成,做着做着做着呢,发现这个事物已经到时了,超时了,那是不是事物就关闭了呀,我认为相当于你这个拆point的没完成,对吧,我就认为这个数据没提交,我只有等着重放就完了,但是呢,Checkpoint的超时时间长,他还等着呢。
13:18
后面呢,结果拆炮还等,真等到它完成了。那大家想是不是就相当于这个数据是不是丢了呀,对吧,状态里边保存了,结果后边没有失误,没有提交掉,那后面你即使是回滚状态是不是最后也不会重新重放这个数据了,诶所以大家要注意这里面有很多细节对吧,要考虑的东西特别多,然后另外还有就是说必须能在进程失败之后还能恢复失误,另外就是提交失误操作呢,必须得是密等操作。大家就会发现,那这么说的话,这个两阶段提交是不是对我们这个系统要求太高了呀,对吧,那实现起来又那么麻烦对吧,你这个这这个有必要这么去做吗?啊,当然这就是还是看我们具体的项目需求,在有一些场景下,你如果对它的状态一致性要求就是容错啊,故障恢复的这个容错要求非常高的话,那是不是这个就非常值得呀?然后另外大家想,这里边对于外部系统的要求,大家看是不是大部分都是一些配置的要求啊。
14:19
他执行起来效率低吗?其实执行起来大家发现效率不低,对吧,他我们这里边做的这个操作,主要干了件什么事。本来这里边一个一个任务一个任务来,对吧,前面这个source读进来,后边做参transform,最后一步做think,然后比方说这里边我直接写入到对应的外部系统里面去,比方说这里边一个消息队列对吧,那大家想到啊,我这做这个checkpoint,做这个两阶段提交的时候,他其实就是说我当前所有的这个数据来了之候,来了一个数据,是不是我还是照常往里边写一个数据啊。只不过就是假如说我这有一个checkpoint的这个barrier收到barrier了,那大家想我现在是不是要得开启一个事物啊,对吧,我这得基于一个事务去做这样的一个提交,所以我这里边唯一的操作是不是就相当于要通知外部系统做一个事务开启啊。
15:20
然后另外是不是就还有一个是是不是要关闭啊,关闭的节点是不是收到job manager那边通知当前的checkpoint已经完成了,收到那个通知信息的时候,我这里边是不是就把这个当前的事物做一个关闭。那大家想这个过程里边,我需要像之前那个预写日志一批在在给他写数据吗。之前是不是来一个都已经写入进去了呀,我现在是不是只要向呃当前的这个外部系统发一个消息就够了,我就告诉他,好,之前我的那个TRANSCTION1那个事物关闭吧,正式提交了对吧,我数据都已经写完了,所以大家看这里边是不是就相当于只要发一个开启事物和关闭事务的消息就够了。
16:03
所以它的这个运转机制其实是搭载了flink内部的checkpoint机制,所以对于性能的影响还是很低的啊,这也是在设计上比较精巧的一点啊,大家可以下来之后再把这个过程再好好的梳理一下啊。然后最后我们再总结一下,那不同的S和think之间如果搭配起来的话,最终我们能保证的状态一致性级别是什么样的呢?哎,通过这样的一个总结,大家会发现这要按照S和think做一个划分了,首先S这边分成两类,是不是可重置偏移量的是一类,不可重置的是另外一类,对吧?那如果不可重置的话,这还有必要讨论吗?啊,不可重置,那肯定数据就丢了,对吧?你后面再怎么保证,你再怎么两阶段提交没用了,数据前面丢了嘛,不重放了,那所以就是最多就是atmo ones了,对吧?那如果要是它可重置的话,对,大家发现这里边是不是它至少能保证at least once,呃,至少保证不丢数据,所以任意的think这里面都能保证at least one,对,然后接下来如果密等提交的话,我可以保证exactly,但是大家注意故障恢复的时候是不是有可能出现短暂不一致啊,对吧?那个好像是状态回滚的那个状态啊,啊,这个短暂不一致,然后预写日志的话,大家注意这里面写的状态一致性是at least once,为什么是至少一次呢?
17:29
这就是因为我们说的,你并不能保证往外部系统写的时候,这个操作是不是事务性的,对吧,你万一一批写入进去的时候,它是写一半,一半成功了,一半失败了,那怎么办呢?啊,你如果要不能回滚的话,那就容易出现重复写入,所以它最多保证的是at least one啊,那然后这个两阶段提交to PC,它是一个完美的实现,对吧,最终能够保证真正的one。啊,这就是我们最后给大家总结出来的状态一致性保证,那在具体使用的过程当中呢,其实这几种方式啊都有都有应用,那你像这个预写日志,他这SD4S也有必要吗?啊,它其实是有用的啊,因为大家想你在提交的这一批提交的过程当中,中间刚好挂了。
18:15
这个概率其实比较小,对不对?哎,所以说这个它其实在可以说90%多的情况下都能保证exactly ones极个别的那种场景下才才出现这种不一致,对吧?啊,那所以整体来讲它其实还不错,另外它的特点就是简单啊,所有系统对系统的要求,所有系统都能用这种先预存存存成这个状态对吧,存成预写日志,然后一批搞定啊,所以它这个实现还是有应用场景的啊,那另外就是密等,密等的话就是你可以只要外部数据库啊,数据系统是支持这样的一个写入的,那用密等其实实现也很简单。那两阶段提交的话,就是最完美但是可能最复杂的一种实现。
我来说两句