00:00
那接下来我们就来给大家具体的讲解一下,诶,什么是密等写入,什么是事物写入对吧?他们到底是怎么样保证我们这里边的这个think这一端就不会重复写入数据呢?哎,这里边首先我们来看一下幂等写入啊,幂等写入这个叫做I adaptant rights,这个大家可能也听说过,它主要说的操作是什么呢?幂等这个其实这是一个数学上的概念啊啊,所以这里边给大家举一个数学上的例子,大家看这个写法,这是这是幂,大家知道是其实是几次方的那个概念吧,这里边我们写了一个这个小括号表示的这个方,呃,这个一个一个N,一个次数啊,这其实说的是什么呢?是对这个函数求导,求高阶导数对吧?呃,求N次导,这个我们知道E的X次这个指数函数有一个非常的特性,就是它求导之后还是自己啊,所以大家会发现求导这个操作对于一的X次这个函数而言,就。
01:00
相当于是一个密等操作对吧?那就是含义就是说,哎,你随便求吧,你求多少次倒最后我得到的还是自己对吧?那就是任凭风吹浪打,我自岿然不动是吧?这里边就是呃,完全最后还保持的是本来的面目,你做多次操作和做了一次操作之后的效果是一模一样的啊,当然了,这里边是它最后保持的是原样,就跟没做操作是一样的这个结果对吧?它比较特殊一点,所以我们这里面对于幂等操作的定义指的就是说可以执行很多次这个操作,但是呢,只导致一次结果的更改,也就是说改了一次之后,你做一次和做两次做三次,后面的更改就不起作用了,对吧?啊,后面的那个操作来了之后,跟只做一次操作是效果一样的,所以它本质其实并不是说就我们说的这个只执行一次,对吧,只做。
02:00
一次写入而是相当于什么呢?只是可以写多次,只是我们相当于保证了它,你写多次最后的效果跟写一次还是一样的啊,这在具体实现来讲,在计算机系统里边什么样的写入就类似于这样的一个一种呢?呃,这这样的一种方式呢?大家回忆一下这个哈希map,哈希map是一个什么样的保存方式呢?我们写入哈希map的时候,那是不是一个键值,对对吧?Key value,那K大家知道是基于它的那个哈希去做的一个重新分配,呃,不是重新分配啊,基于哈希值去做的一个当前这个地址的一个选取,对吧?保存在哪里是基于那个哈希值去决定的,所以如果你当前的这个K相同的话,那当前它保存的位置就不变,然后呢,呃,对应的这个value,那当然就是我们当前的那个地址对应的那个位置去保存的那个里边填入的那个数据了,对吧,所以你看假如说我们当。
03:00
当前的这个KY6,这个KY6对是同一同一个KY6对的话,你同时把一个值去写多次,那是不是相当于我当前的这个地值也是同一个位置,大大家知道这挂的是一个一个这个呃,链表这样的一个状态,对吧?那然后接下来呢,里边的值又是同一个值,那是不是相当于你重新写一遍,还是一样的状态呀,对吧?所有里边的这个地址和值都一样嘛,那当然就是最后结果都完全一样。啊,所以这就是一个典型的幂等写入的操作啊,那大家也可以回忆一下,之前我们把这个输出的结果啊,写入到MYSQL,呃,和写入到这个red的时候,我们以当前的ID作为K去做保存,然后把一个温度值去做一个更新,这个时候是不是就相当于你假如说把同一个温度连续写两次,最后的效果是不是也是完全一样的呀?对吧?啊,所以在这种场景下,其实可以认为我们是实现了一个幂等写入的啊,那它就能保证我们的这个状态一致性,对吧?呃,你就相当于我后边这个你要做这个偶数求和,要做计算了啊,那就是六这个已经之前处理完了,已经变成了这个12,然后后边我再去叠加恢复到五对吧,恢复到五的时候,然后再去加六,再加一遍六又输出一次12,这边的这个结果还是一样的,还是不变的,对吧?呃,后面你再去加七啊,最后就相当于还是一个一个在之前的正确的结果上,往后叠加不会出现任何的问题,所以这种情况就没有没有问题,但是你如果说不是这种密等操作,你像之前我们那个,呃,直接写到文件里边,或者说像我们那个ES输出的时候,我们没有指定K,然后直接那条数据在后面追加,那你会看到这种情况下,你多输出一次,它就多多了一条记录,对吧?哎,所以外部系统消费的时候,那可能就会引起这个错误的结果了啊。
04:58
所以这是这个密等写入的这一部分内容,当然它是真正的实现了我们完全的exactly one吗?呃,其实也不是特别的完美,它在什么场景下有点问题呢?就像我们刚才说的,哎,你做那个我们保存那个温度值啊,呃,就是把它是可以去更新,就多次写入是跟之前写一次是一样的,但是有可能出现什么情况呢?哎,我们的这个数据它有可能来的很密啊,对吧?啊,有可能我们在两次point之间有可能隔了很多数,对吧?这里边我做了一次checkpoint,然后后边三个数我都处理完了,结果这会儿挂了,哎,那会导致什么什么问题呢?我们读取那个温度值,比方说之前我读到这儿的时候,这里的温度值是十度,然后接下来怎么样呢?诶,后边这个温度值是十五二十二十五,就一个一个就上升了,那我后边写入到这个release里边,我就会看到啊,假如说我有一个。
05:58
它这个,呃,大屏监控当前这个我直接显示出来,我就会看到这个数值不停的跳变对吧,十十五,呃,这个二十二十五就已经在上升了,然后接下来如果这个时候挂了之后恢复到之前的这个状态,那会怎么样呢?我会发现哎,他突然25之后又突然跳变到了十,然后再恢复到15,二十二十五对吧?呃,再再恢复到这这样的情况,当然当他如果追上我们之前的这个25之后啊,再往后走,那就跟没出过故障之后的那个状态是一模一样的,但是中间的这个过程是不是相当于有点奇怪啊,对吧?就这个过程看起来有点像是突然短暂的回到过去,重复了之前的一段一段这个历史,然后再追上这个历史之后,然后后面我们再去做操作,那这个时候如果说你的这个大屏监控,我们想看的是什么想。
06:58
看的是温度连续上升报警的话,是不是就有可能出现,有可能出现问题啊,对吧,在这中间突然一下回去了,我们那个连续温度连续上升,不就直接把这个就中断了吗?对吧?定时器流取消了,那这个结果就不对了啊,所以在某些场景下可能会导致我们处理的结果不正确啊,这是密等写入,然后接下来再给大家说一下这个事物写入,事物写入的话,呃,其实整体来讲大家就更加熟悉一点了,Transactional rights吧,那那事物整体来讲的话,这是我们在讲这个关系数据库里边啊,呃,事物的特性,呃,正确执行的四个要素,Acid大家都很熟悉对吧?那最主要的其实就是它里边必须要有原子性,也就是说如果我们构建一个事物的话,其实是这个事物里边就是一系列的操作,这里面的每一步操作,要不全部成功,要不一个都不做,对吧,那就是如果成功,那就所有的都成功,状态都更改,如果不成功的话。
07:58
啊,所有操作都取消,呃,都全部都一个都没完成,那像之前我们比较熟悉的就是银行转账,我们知道在数据库里边实现的话,相当于其实就是两步操作嘛,转账的话,你给别人账户里边转100块钱,其实就是自己的账户要减100,数字减100对吧,别人的账户那边要加100,其实就是这么简单一个操作,但这两个操作呢,必须捆绑在一起,对吧,你不能说诶我在数据库在做这个减100操作成功了,然后还没做加100的时候直接挂掉了,然后就就直接恢复之后呢,你就以为就是这个减100成功之后的那个状态,那就相当于平白无故我这边少了100块钱,结果别人那边没有收到吗?哎,所以必须是要不转账成功,减和加同时都都更新成功,要不的话就相当于这笔操作回滚对吧,交易没成功,你重新转了啊,所以这就是我们所说的这个原子性啊,那在这个checkpoint。
08:58
呃,我我们在这个状态一致,保证状态一致性的过程当中啊,对于这个think写入到外部系统,想要去保证它的这个事务写入这个怎么样去做操作呢?啊,其实我们能想到,呃,能做的这个操作是什么?就是在写入到外部系统,Think到外部系统的时候构建一个事物,对吧?那这个事物到底是就是哪些操哪些数据写入的时候我要包装成一个事物呢?哎,一个简单的想法就是我跟捆绑在一起啊,就是你现在不是现在不同的数据,他的这个状态保存会保存在不同的这个里面吗?对吧?这里面来了一个bar,大家知道,诶这这个bar之前的这些数据状态引起的状态更改就都保存在这个bar这个一里了,对吧?然后后边来的这些数据呢,它的状态改变都会保存在二里面。
09:58
所以接下来我可以怎么怎么去做操作呢?哎,我可以就是你当前这个checkpoint没完成的时候,我不要着急把这些数据都都写入到外部系统嘛,对吧?诶我怎么样做操作呢?等到这个checkpoint真正完成的时候,我再把这些数据全部写入到外部系统里面去跟这个CHECKPOINT11对应,那这个效果就是什么?这个check checkpoint完成的时候,就是这些数据操作我内部状态都已经做完的时候,对吧?都已经完成的时候,而且也是什么时候呢?也是外部操作系,外部的这个think系统也都写入的那个状态,而且刚好是后边的数据一个都没做,操作状态没更改,而且也一个都没写入到外部系统,对吧?这样如果去一一对应的话,我后边如果假如说中间这里边挂了对吧,这挂了,那这两个数据怎么样呢?这两个数据没写入外部系统啊,只是内部状态做了更改,我做。
10:58
恢复之后是不是又恢复过来了,那外部系统呢,我我是要等到这个事务,是要等到这个下一个checkpoint完成的时候,我才提交这个事务啊,那这样的话是不是就可以保证写入到外部系统,哎,永远都是跟这个checkpoint状态是完全一致的,哎,这就是这个基本的想法,好,那它的具体实现方式呢,又有两种,这里边给大家提出这个概念,一个叫做预写日志,另外一个叫做两阶段提交。
11:27
啊,那具体给大家还是稍微说一说这个概念吧,啊,就是一个是预写日志,预写日志叫做write aheadlo,大家有些地方可能也见过啊,缩写叫waal啊,著名的wal啊,那这个预写日志非趁理解它的含义,就是说你不是要跟那个拆point的对应吗?哎,那我就把当前你要写入到we部think系统当中的这些数据直接做一个缓存,我直接把它当成状态缓存保存起来,然后接下来怎么怎么样呢?先去做那个,先等那个,对吧,等到做完收到manager发出的那个呃,Checkpoint的完成通知的时候,我再把这个缓存里边的数据拿出来,一次性的批量写入s think系统里面去啊,就相当于我最后这个写度包写入到外部系统,是做一个批处理,对吧?一批写入啊,那它的好处其实是比较比较显而易见。
12:28
啊,简单易于实现对不对?因为这里边我做缓存嘛,然后在一批写入,这个其实就是无论什么样的think系统啊,就是我们说这个flink,有些给我们有官方的连接器,对吧?有些没有,我们要自己去实现,那你自己实现的话,怎么保证它的这个状态一致性呢?哎,你用这个预写日志方式非常简单,那我先把它缓存起来,最后再一批写入就完事了嘛,啊一批搞定对吧?这个其实非常简单,但是这种方式是肯定是有问题的,对吧?它有什么问题呢?首先就是你后面做批处理了,批处理的方式,这个一批写入信系统,当然这个外部系统如果消费这个数据的话,实时性就降低了,对吧,你最后前面你处理再快再流逝,最后我消费这个数据的时候,看到的还是隔一段时间一批,当然这一批呢,呃,你可以把这个checkpoint那个时间间隔调小一点,对吧?呃,我默认不是那个两,呃就是500毫秒嘛,你调小一点,那可能我这个这个平。
13:28
男的做这个checkpoint,那相当于也可以做到就是呃几百毫秒或者说呃几十毫秒这个这个级别的响应,当然你不要太小,太小了的话,那我们所有时间都做check point了,因为我们说这个默认它是一个要阻塞我们当前那个任务进程的嘛,呃,你所有时间都去做checkpoint不干正事了,这肯定是不行的啊,所以说这是啊这个一种方式,呃,但是它会带来我们这个延迟的增大,时效性的减醒。另外还有一个问题是,这个其实也不是特别的完美,为什么呢?因为大家想到你在做这个提交的时候,一批搞定对吧?啊,那有一些这个外部系统,它会有一个什么问题呢?有可能你一批搞定的时候,做到一半挂掉的时候啊,就你如果要是一个文件写入的话,大家可能会想到,那我打开一个文件,呃,写一半挂掉之后,那最后它相当于应该要完全回滚对吧,这个文件相当于没有写入,没有保存,那我下一次就相当于呃,重新再写就可以了,这个。
14:28
好像是可以做到这个失误回滚的,那有一些系统其实是我在做这个批量,再去做写入的时候,它相当于还是一条一条写,对吧?有可能我这边的批处理,呃,你比方说我要往那个MYSQL里面写写数的时候,那是不是相当于还是一条一条往里插啊,那那就有可能出现,假如说我那个MYSQL里边你没有再去构建事物对吧?你当然Mexico里面可以去构建事物,但假如说你没有做这件事的话,那就会导致呃,就最后就相当于还是呃这里边就是。里面的这个写入有一半是写入的,那另一半没有写入,那你最后这个拆你的恢复的时候,你怎么办呢?你这里面有一半数据已经写过了,有一半数据是没写,那你这个数据是要重放还是重放呢?对吧?这一组数据你要么全重放,要不要不全部重放,因为这是以它为准的嘛,所以这里边是有这样的一个问题的,所以在这种语义下,那你最后就只能是全重放,那就会导致有些数据重复写入,对吧?最后就相当于还是一个at least ones了啊,你如果想要就是真正把它实现的话,那是不是最后你还得在这个我们的这个think系统里边再去实现一个啊,一个事物的提交啊,对吧?啊,所以这里边就是说这个预写日志它是简单易做,但是最后的效果可能不是那么的完美啊啊,那当然data three apilink里边给我们提供了一个模板类啊,就是叫做generic right head s,你如果实现了。
15:58
这个类的话,就可以实现这个预写日志的书性提交了,这是这一部分,然后另外还有一个非常重要的提交方式是什么呢?那就是啊,就是有前面我们发现就是不管是密等提交还是预写日志好像都不完美,对吧?诶,那什么样的提交方式比较完美呢?这就是我们说的两阶段提交啊,两阶段提交就是说two face commit啊,Two PC啊,或者说我们叫2PC啊,它的含义就是什么呢?啊,就是我们说的每个checkpoint里边,我们要启动think任务,要启动一个事务,接下来呢,所有的接收到的这个数据要添加到这个事物里面去,就是刚才我们说的那个你自己要去实现的这这样一套机制,对吧?那具体的实现提交的这个机制是什么呢?呃,这里大家注意啊,就是它的这个还有一个好处是什么,我可以不要直接就是说直。
16:58
要先攒着,就是等到这个最后拆point完成的时候再一批提交,而是可以怎么样呢?诶,这里边数据来了之后,我还是来一个就处理一个,然后think任务就就写写入一个,写入到外部系统,但注意现在我是启动了一个事物,对吧,我是基于这个事物去做写入的,所以呢,诶这里边的这个这个写入当前事务,如果没关闭的话,是不是有可能之后我会撤销啊,诶所以说这里边的这个事物就对应着我们这里面的check,所以接下来来一个数据就写入一个,来一个数据就写入一个,来一个事物就写入一个,遇到check的这个barrier,然后我做保存,保存全部完成之后,然后怎么样呢?诶就去关闭,就去提交当前的这个事物,提交当前的这个transaction,然后接下来我这个就正式把它提交正式可以消费了,对吧。
17:55
所以大家看用这种方式就相当于,哎,我们这个过程相当于是什么呢?前面的这个提交,它是一个预提交,对吧,就每来一个数据就直接写入,就直接提交了,这是一个预提交,然后呢,等到真正拆矿品完成的时候才真正提交,把这个事物关闭,这样就保证了我们所有数据是真正一致的,那在这个两阶段提交的过程当中,大家还会发现它避免了我们那种批处理一次性写入,对吧,那这个性能就好很多了,我这个数据其实是已经一个一个都写进去了,只不过是这个事物还没有关闭对吧,只不过是说这个事物还有可能全回滚。
18:35
只是这样一个状态而已,所以这实现这就是最好的一种实现,它在时效性上和最后我们在做这个事物处理的过程当中,而且你会发现就是假如说后边来的这个我们我们中间这个挂了对吧,挂了就挂了呗,那挂了的话,你会想到后边我们这个事务没有提交,他是不是就会全部回滚啊,对吧?呃,回滚的话,那当然接下来这个就很很容易做这个操作了,对吧,跟这里的状态就完全一样啊,这就是这个两阶段提交的这个含义啊,所以说这种方式真正实现了exactly ones啊,那当然如果说我们用flink系统里边给我们提供的接口去实现的话,那实现一个什么呢?实现一个to face commit think function就可以了啊,之前其实大家在这个代码里边也见到过啊,就是卡夫卡的那个连接器,其实就给我们实现了这样一个接口,呃,然后这里边呃,大家还要注意一下,就是两阶段提交啊,大家看它很看起来很完美对吧,看。
19:35
起来很好,但是他对外部think系统是有要求的,首先就是刚才我们说的外部系统是不是必须得提供事务支持啊,对吧?你像这个呃卡,我们可以供这个事务支持,或者说呃,你像这个Mexico,对吧,我可以去创建这个事务,可以去提供这样的事务支持。另外还有一个什么呢?呃,就是在这个间隔期间内,我们必须当前的这个think任务可以开启一个事务,对吧?呃,然后我们的这个外部系统可以在这个事物里边持续的接收这个数据的写入,这是必须能做到的这个状态,然后呢,你要实现的过程当中,必须保证在checkpoint完成之前,当前这个事务必须是预提交或者说等待提交的状态,对吧?啊,这里边就有一个问题,就是说在故障恢复的情况下,就是这可能需要一些时间,就是说你如果现在挂了,然后再去重新恢复,那这个预提交的过程,我们那个外部think系统那边。
20:35
开启的这个事物啊,它等待的这个过程可能就会很长,对吧,因为你那边相当于又又重新启动,又重新做拆F放去了嘛,那这个就等很长时间,那假如说这个时候外部系统因为超时关闭了,那是不是相当于这个没提交的数据就真的丢了呀啊对吧?啊,这个时候部系统我认为你超时了,你这个没做完,但是呢,后面我们接下来这个又会正常做完,对吧,我认为这个数据内部状态认为它处理完了,但是外部系统没有写进去,那最后就相当于丢失了嘛,啊所以这个是一定要注意的,那另外还有一个就是说,如果说挂了之后,我得能把状态那个事物还能恢复出来,对吧,我不光是能恢复状态,我得把当时创建的那个事物也得恢复出来,然后才能基于事物去提交嘛,啊这个就是要求非常非常多,另外还有就是提交事务必须是密等操作,哎,为什么呢?因为之前有可能我已经提交过一次这个事务,那假如说我恢复之后,我这个。
21:35
数据重新提交一遍,还是基于这个事务去提交,那这个会不会有问题呢?不能有问题对吧,就是同样的一个事物,如果提交多次的话,它应该是一个密等的操作,所以这个要求其实对外部系统要求还是挺高的啊,那有同学可能就会想这个是不是,哎,这个是不是要求太严格了呢?那这个flink这个对这个to PC实现啊,要求这么高,那是不是实现代价太大,我们对性能会有比较高的影响呢?
22:05
啊,当然就是说,呃,确实它实现的这个代价是比较高的,但是这个代价主要体现在哪里呢?是我们程序员你需要去实现的东西比较多,大家看这个对吧?就尽管这个flink给我们提供了两阶段提交,To,呃,To PC呃,To face commit think function呃这样的一个接口,但是里边的这个到底怎么样跟checkpoint的捆绑对吧?呃,然后呃,你怎么样去这个创建事物,怎么样去提交事物,然后你怎么样去配置,让他不要去超过外部的那个超时时间,啊这些东西都是需要你自己去自己去写,自己去配置,自己去管理的,所以其实是程序员需要去做的事情比较多,但是对于这个flink运行而言,它的代价还真不大,为什么呢?你看这里边它其实是直接捆绑在我们的那个checkpoint基础上了,对吧,在做操作的过程当中,其实他还是来一个think一个出去,来一个think一个出去,我们这里边额外的。
23:05
开销是什么呢?就是开启了一个事务,然后怎么样呢?等待checkpoint的结束的那个完成的通知,然后关闭事务,你看是不是就是做这样的两个操作啊,所以这里面其实并没有额外的太多开销,所以整体来讲这个机制还是非常的,就是对于性能的影响还是非常小的,如果你对整个系统容错性啊,状态一致性的要求非常高,用这种方式就是最好的实现。
我来说两句