00:00
事物写入,看起来我们主要的思想就是要构建一个事物,那具体实现应该怎么样去实现呢?啊,又有两种方式,一种叫做预写日志,另外一种叫做两阶段提交。首先我们来说一下浴血日志,浴血日志其实本身比较简单,就是大名鼎鼎的wal,也就是说。我们当前其实可以直接把所有的数据先当成状态。在think任务这一环节。把它先缓存起来,保存起来。那就是不要着急的先把它写入到外部系统,我们不是说要构建这一个一个事物嘛,啊,要等到checkpoint完成的时候才把它写入嘛,所以我干脆我我在这儿就直接不写入了。先把它缓存起来,等到收到checkpoint完成通知的这个时候,再把它一次性的批量写入到外部系统当中。
01:00
啊,这个过程,所以为什么叫预写日志呢?就是说相当于我在这个缓存里边是做了一个类似于日志的一个预写啊,预先先把它保存到这个作为日志先保存起来。然后进行检查点保存的时候呢,所有这些日志数据也作为状态一起做这个持久化的保存,哎,那也就是说如果在这个过程当中挂了的话,没关系,我之之后去重新恢复状态,还可以从think任务的状态里边把它再读取出来,检查点里边可以读取出来啊,那就还可以继继续去往后去写入了。那在接收到检查点完成通知的时候呢,再把所有的任务一批批量写入啊,那它的特点就是类似于我们是构建这这个事物做了一个批处理啊,就是到了这儿的时候攒着不要来一个就处理一个,就直接输出出去,而是等到checkpoint完成的时候,再做一个批注,把它一下子写入进去。
02:06
它的优点当然就是说比较简单,我们的数据都是在状态后端里边,只要开辟一块内存空间啊,当成状态把它缓存起来就可以了,所以这个对于外部系统其实是没有任何要求的。我们可以外部系统甚至都可以不支持事务,我只要知道当前是跟checkpoint有关就可以了,只要做一个批批处理啊,一批直接搞定就可以了。而在flink this stream API里边呢,它也提供了类似的一个模板类,就是叫做generic right ahead think,这样的话就可以直接通过这样一个批量写入的think任务来实现预写日志这种事务性的think。啊,那它的缺点其实也非常的明显,首先就是这里边我们相当于最后一个环节,Think任务做的是一个批处理,那批处理的话当然延迟就会比较高,而且我们是攒了一大批数据,这个写入可能时间会非常的长啊,这个过程就不符合我们整个旅游处理的思路了。
03:12
而且需要注意的是,用预写日志wal这种处理方式呢,有可能是会写入失败的,那所以我们会发现他写入的这个时间点其实是当前checkpoint已经完成,接收到manager完成通知的时候,我才会去把当前的这一批数据去做一个写入。而这个时候如果说整个写入失败,那就会涉及到一个问题,我们当前写入的时候,这一批写入的时候,首先就有可能写入了一半,在这个外部系统里面写入了一半,那这一半儿我是应该啊应应该全部撤掉呢,全部删掉呢,还是应该重新做一个写入呢?诶这就涉及到还是得要求外部系统得支持事务,要不然的话,那写入的那一半就要重复写入了,我这边可以重复提交嘛,那就就只能是去做重复写入了。
04:12
那这里边还需要涉及到一点,就是说还需要跟John manager那边做一个再次的确认,为什么呢?因为之前我是所有的状态都已经在检查点那边保存完毕了,通知了装manager manager就认为我这边检查点保存完成了,但事实上。如果我们当前的这个批量写入没有真正成功的话,那其实我们当前这个检查点还是应该失效。因为如果真的这批数据就真的写不进去,总是写不进去,最后失败了,那相当于我们这一所有的这个检查点里面的数据最终没写到外部系统。那当然发生故障的话,我们是应该所有的数据重放,重新去做写入的,如果说我们这个检查点就已经保存,认为保存成功的话,那当然从这里恢复出来的数据就不包含,我们当前就不会再重放这里的所有数据了,那接下来这些数据就会相当于丢掉了。
05:15
所以我们还需要再次去向manager做一个确认,只有所有的这个数据真正写入外部系统的时候,才能够这边真正认为当前的检查点保存是成功的。那这里就又涉及到了另外一个问题,就是假如说我们在做确认的这个过程当中又发生了故障,那怎么办呢?哎,那就变成了照manager这边认为我这个checkpoint还没有没有保存成功,但是事实上我们已经写入进去了,我现在是要向专manager那边去做确认的,诶那这样的话最终相当于我这个拆point没有保存成功,那后边还要去回滚,还要去重放数据,那就相当于做了两次的写入。
06:05
所以本质上来讲,预写日志这种方式呢,在绝大多数场景下能够做到。精确一次,但是在这种特殊的场下,就是在做这个二次确认的这个时候发生故障的场景下,那它是有可能会做重复写入的,所以最坏情况他只能做,只能达到。啊,就是至少一次这样的一个级别。那是不是我们就没有真正能够实现的这样的方法呢?啊,其实还是有的,那就是最后一种要介绍的事务性的提交,那就是两阶段提交,To face commit。有时候把它简写叫做PC,或者叫PC啊,所说的就是两阶段提交。顾名思义,所谓的两阶段提交。它就是要把我们整个。
07:03
Think任务提交数据的这个过程。分成两个阶段啊,首先我们要先做一个预提交,然后呢,再做一个正式的提交,这样的两个阶段,那具体的步骤是什么呢?呃,具体步骤其实就是说要真正意义上的创建一个事物。我们每来一个数据处理完了之后呢,不要像预写日志那样先攒起来,然后做批处理,真的就是流失处理,来一个处理一个,就直接往外部系统去写。但是。写入的这个过程并不是直接就提交上去了,这个过程是预提交,是通过一个事物。做了预提交啊,那既然是通过事物做的预提交,很显然之后就有可能还要把它撤回啊,所以我们就会发现了,当前还是流逝的处理,但是呢,当前的写入就不再是唯一不可变的那种场景了,这之后有可能还能撤回来了。
08:07
那接下来我们就知道了,所有的数据来了之后。都是基于当前这一个事务去做了预提交,什么时候正式提交呢?那就是还是接收到checkpoint完成通知的时候,跟检查点捆绑在一起,现在完成了,我们就可以把当前的这个事物正式的提交给外部系统,那外部系统的话,这些数据就真正可用了,不会再撤回了。所以我们会发现,假如说已经提交上去之后,后边的数据处理的过程当中发生了故障,那我们就会回滚到之前的这个checkpoint,而这个checkpoint对应的状态,那就是所有数据真正通过事物已经提交到。外部系统里边去了,哎,那所以当然这些数据就不需要再回滚了,我们要回滚要重放的就是接下来的这些数据了,而接下来这些数据呢,很显然它就应该是一个新的事物在提交,而我还没有做checkpoint的时候,当前就发生了故障,那很显然这个事物就要撤回,所以之前我们提交到通过这个事物预提交到外部系统的这些数据,相当于也就不可用了,也就全部撤回来了。
09:27
啊,那接下来在重放数据,重新写入,就只会写入一次,这样的话我们才真正意义上的实现了所谓的精确一次,啊,那它的要求当然就会比较高,这个外部系统就必须能够支持事务,能够支持我们开启失误,而且通过事物去提交,而且去呃,能够发生状况的时候,能够会撤回当前的事物,所有的这些必须支持。而在flink里边呢,呃,当前我们的SK任务也必须要能做事务相关的这些操作,而且它是两阶段提交,所以flink里边专门提供了一个叫做to face commit function这样一个接口啊。
10:11
顾名思义,这就是一个两阶段提交的方式。那我们会发现在这个过程当中,两阶段提交是不是有点太复杂了呢?会不会影响我们整个系统的性能呢?其实是不会的,因为如果发生中间,中间发生故障的时候,我们就做这个事物的回滚,这个回滚的话,其实也只需要think任务向外部系统去提交一个消息就可以了。而如果说我们当前是正常情况下向外部系统去写入的话,这本身就是我们think系统think任务要做的操作嘛,同样他也不会等待,都是来一个处理一个,就往里边写入,区别只是在于要开启一个事务,然后提交一个事务,或者说回滚一个事务而已啊,那整个这个处理流程其实就是搭载了flink系统里边的checkpoint的机制,然后加上了一些事务性的处理,就完成了我们这样一个两阶段的提交啊,那它的。
11:18
这种方式其实是非常的精巧,非常的巧妙的。看起来对于性能也没有太大的影响,但是当然它也是有代价啊,它其实对外部系统是有很高的要求,它有什么样的要求呢?那简单来说有这么一些啊,首先外部系统必须提供事物的支持,或者think任务能够模拟外部系统上的一个事物。另外就是在检查点的间隔期间内,必须能够开启一个事务,而且去接收数据的持续的写入。然后当我们接收到检查点完成通知之前,事务必须是一个等待提交的状态,诶这里面有一个问题,因为当前我们等待提交的话,那相当于是预提交嘛,这个事物里边所有数据是预提交的,在发生故障恢复的时候呢,这个过程可能是比较长的。
12:14
这就造成一个问题,这个事物等待的时间有可能就会很长,那我们知道事物是有一个timeout时间的。超过这个时间的话,事物自动的就相当于撤销了,就回滚了,那如果在发生这种情况的话,而我们这个checkpoint呢,它还在正常的去保存,正常去做处理,后面如果check认为我是正常保存了,而我们的事物给撤了,那就相当于这部分未提交的数据就丢掉了,所以这个我们是一定要注意的。啊,那解决这个问题呢,主要就是要配置一下相关think系统的一些超时时间,我们只要保证它不会在这个呃,还没有提交checkpoint的时候就直接超过了超市时间,只要能保证这一点就可以。
13:05
不会出现数据的丢失。而另外还需要就是think任务,如果当前进程失败发生故障之后呢,还能够恢复失误,那这样的话我们才能把之前没有提交的数据也恢复出来,另外提交事物也必须得是密等操作,诶,这个过程我们就会发现了,尽管它的性能要求可能对性能的影响并不那么的高,但是在实际应用的时候,对因为对外部系统的要求非常的高,所以实际应用还是会受到比较大的限制。我们在实际项目当中的选型,最终还是要考虑具体的处理性能,实际外部系统,Think系统的具体的场景啊,还有就是一致性级别,最终我们是要做一个权衡,然后去选择我们最终使用的方式。那最后我们可以总结一下,当S和S两端连接的外部系统有不同的特点的时候。
14:06
我们最终整个端到端的状态一致性能够达到什么样的一致性级别呢?啊,那我们主要就是区分S和SK了,内部flink的话,我们当然是要开启checkpoint检查,点开启之后就能做到精确一次啊。那对于S而言,关键划分就是可重置和不可重置,如果S本身偏移量不可重置的话,很明显数据就有可能要丢啊,那这个最多就是atmos了。而如果S可以重置偏移量的话,能够重放数据,那至少就能够保证数据不丢。所以当前。至少已经是at,不管think是什么样的外部输出,我们当前数据不会丢at。而如果think能够做到密等性的写入的话,那么整体来讲我们最终端到端可以做到exactly once精确一次,但是呢,发生故障恢复的时候,前面我们说外部消费数据的话,有可能会出现一个短暂的不一致。
15:13
而如果使用事务性的写入的话,又区分了预写日志和两阶段提交wa写日志的方式。那么在使用。可重置数据源的前提下,我们可以保证它的数据不丢,大多数情况下是可以做到exactly one的,但是在有一些特殊的情况下是做不到,那就只能达到at least,而对于两阶段提交to PC呢?诶,那我们说它是真正意义上可以做到,真正意义上实现了事物性的提交,这就是我们所说的端到端的状态一致性的保证。
我来说两句