00:00
这里还有另外一个问题,就是我们是周期性的去做保存,那假如说当前到了需要去保存检查点的这一时刻,那是不是照manager发出指令之后,所有task manager、所有任务接到指令就立刻。同一时刻马上去保存当前状态的那个值呢?诶,这个看起来好像是很合理,这就像我们平常拍照一样嘛,就直接把所有人,假如说我们想给一个班的同学去拍一张合照的话,那就直接通知大家,好,现在我们要拍合照了,把手头的事情都停下来,然后到下边去集合123茄子,然后咔嚓一张拍一张合照,这样的话我们就可以知道现在的状态。但是这样有问题,就是因为如果我们现在每个人的状态并不一样的话,有同学学习到了第三章,有同学学习到了第五章,这个时候我们本身当前的这个状态处理都不一致,那我们在保存每个同学当前状态的同时,还应该把他的上下文也保存下来。
01:12
具体到。Flink的一个程序里边的话,那就是,呃,假如说当前我们在做一个算子计算的时候,啊,呃,某一步计算有可能定义了一个value。Value。保存一个值嘛,哎,那这个值呢,有可能我们这个在代码的处理流程是在某种情况下,诶,遇到某个情况了,我们要把这个值要加一啊,然后更新update当前这一个。Value。做了一个更新,然后后边做了某些操作之后呢,后面又要再加一,然后又做一个update。那这个程序处理的流程这就不以我们的意志为转移了。当前如果说要保存。状态的时候,我就不能确定它到底是程序执行到了哪一步,做了这样一个保存,诶,所以当前的这个value state有可能。
02:07
是在程序一开始当前数据,这就相当于还没处理的时候,直接就做了一个保存,那就相当于当前数据完全没有影响。而同时呢,也有可能是在前面做了一部分的时候,加一了之后,然后做了一个保存,那就相当于当前数据对它有影响,已经更新了,已经加一了,而另外还有可能是当前数据彻底已经已经计算完毕了,那是做了两次update啊,所以我们当前到底执行到了哪一步,其实是不知道。如果说我们不能精确的定位到当前代码执行到哪一步的话,那其实就不知道发生故障了,如果从这个检查点恢复之后,我们接下来该怎么办?那就是假如说我们当年这个状态值是十。呃,那我怎么能知道当前这个数据到底还要重新?
03:00
计算还是不重新计算呢?我并不知道它是在执行的过程当中,是一开始就已经保存了,还是等到全执行完了的那个那个时刻才做的保存,甚至还有可能我是在执行中间的时候做的保存,那这个就更麻烦了,那当前的这个数据我得把数据保存下来,要重放,而且呢,还得从中间开始重放,执行了一半的时候开始重放。这个就太麻烦了。那有没有更好的方法呢?哎,更好的方法当然是不要把所有的节点。我们都都截取当前的这一个状,一个时刻,同一时刻,而是让当前的节节点,当前的任务都把手头正在处理的这个数据都处理完,诶这样的话很明显就会更好一点,哎,我们当前这个数据明确他已经处理完了,那接下来就不要再去再去处理了,如果说当前数据就没有处理完的话,那我们保存的数据就不包保存的状态就不包括它嘛,那接下来。
04:07
发生故障重新恢复之后,就应该把这个数据要做一次重放。这是我们能够想到的一个一种方法。而且这样还有一个好处就是如果说我们是直接全所有的这个任务都暂停的话,那很明显就相当于是把这个程序的执执行流程中间打断了,我们就必须等到所有的状态都保存完毕之后,才能继续去处理当前的数据。这相当于是我们直接按下暂停键了,这种方式其实是不太合理的,相当于我们人为的引入了一个非常大的延迟,就像我们直接拍合照一样,手头所有的任务就都停下了,只能下去去排队,然后拍照,拍完了照之后才能上来进行计算。在对于实施性要求非常高的流处理里边,有时候这个代价是不可接受的啊,那所以我们现在的方法就就想到了,那就不应该让所有的任务同时停下,然而且打断当前流处理的这个过程,去执行当前的保存快照的这个过程。
05:17
那怎么样去处理呢?比较好的方法就是。按照数据处理完之后的那个状态去进行保存。这样还有一个好处就是我不需要让所有的并行子任务在同一时刻去保存数据了,我只要看当前的数据处理完成就可以了。啊,那另外呢,我们还可以从一开始的原任务S任务开始保存一个当前的偏移量,因为我们知道之后有些数据如果要是没有保存,它的处理结果没有保存到状态里面的话,还需要重放,那怎么知道一个数据到底?
06:01
需要重放还是不需要重放呢?那最简单的方法,当然我们可以在后边每一个任务这里都去保存当前哪些数据处理呢?哪些数据没有处理,但是这个代价就比较高,而且还有一些有可能是数据在在这个传递过程当中呢,呃,在网络传输的这个时候,我们当前要保存这个状态,那当前这个数据到底它是要保存的还是没没保存的呢?对于前一个状态而言,它是已经处理完了,我们可以保存了,但是对于后一个状态而言,他还没收到啊,根本不知道这个状态到底要不要保存,所以这个就比较麻烦,我们干脆就从头开始,从第一个S任务开始。把当前处理到哪个数据了,当前的位置偏移量作一个保存,作为状态保存起来。这样的话,我们就知道了,SS任务已经处理到了某一个数据,这个数据呢。
07:00
如果要做当前保存点,保存一个检查点的话,做一个快照的话,那就应该是等这个数据在后边的任务全部都处理完毕,到这个时候我们就可以去拍一张快照。所以在flink它的保存检查点的算法里边,它的思路其实是让所有任务都处理完同一个同一条数据的时候,找到这个时间点去做一份快照的保存。所以这样的话,这就可以让我们在整个架构上,代码上带来很多的方便。在这个过程当中,我们也发现了对于flink重放数据发生故障需要重放数据这个特点。那还需要。数据源那边能够支持数据的重放啊,这就是我们之前说的,比如说像卡夫卡啊,卡夫卡那边的话,我们可以重新提交偏移量,重新消费数据,这样的话就可以满足这样的一个要求,如果说数据源那边本身就不能重放数据的话,那很显然后边我们就没有办法把。
08:14
两次检查点保存之间处理的那些数据,再进行重放进行。重新的计算啊,那所以这个过程当中,我们就会发现啊,每一个数据保存的时候呢,就是要不就是完整的被整个。所有的任务,我们当前应用链条里边的所有的任务都处理完了,这个时候才做了一个保存,要么呢,就是在中间没有处理完,没有处理完我们就不会保存,就会直接把这个数据相当于它的状态都丢弃,那接下来我们就会发生故障的时候,就会重放这条数据了。所以我们会发现,这相当于是构建了一个事物transaction。就是把当前数据的处理和它最终进行检查点的保存状态的快照构建成了一个事物,如果当前所有的数据,所有的任务都把数据处理完成的话,我们才会去进行检查点的状态里边进行保存。那如果说。
09:20
处理的中间发生了故障。对应的状态没有保存的话,回滚之后啊,当发生故障之后,我们重新读取状态,就会相当于把所有的处理过程都回滚,这就类似于事物的撤回,事物的回滚了。那接下来我们再来介绍一下检查点具体的保存流程,我们可以用一个例子来看得更加的清楚一点啊,那所以我们会发现检查点保存最关键的就是要把同一个数据让所有任务都处理完毕才能够保存。我们看一下work com的这个例子,假如说我们这个代码之前都已经实现的非常的清楚了啊,那就是把这个数据输入进来之后,接下来呢任务,然后是先做了一个map,把它转换成了二元组,一个word,一个一,然后接下来。
10:14
K做分组,然后直接调萨some做了一个求和聚合这样一个计算啊,后面的话如果要打印这个我们就不放在里面做分析了,我们关键是看前面的这几步。假如说现在我们输入的数据非常简单,就是hello word hello flink hello hello flink啊,就是这样交错开的一组数据,接下来我们看一看整个处理流程是什么样的,如果要做状态保存的话,到底应该怎么保存。首先我们看一下当前的状态。当前这个效果呢,我们会看到数据源这边,呃,最前面。我们是从左到右每一个数据依次的进入flink系统啊,那所以前面的右边的右侧的是最前面的数据,后边左边的呢,是后边的数据啊,那首先前面我们有一个SS任务,原任务从。
11:11
数据源里面去读取,然后现在我们看到它的偏移量是三,所以其实已经读取了三条数据了。按照我们前面的定义的话,那就是hello word hello,所以接下来下一个是flink,然后hello嘛,呃,那后面跟着的数据当然就是flink hello,前面已经读取了两个一个。但是我们知道,对于flink而言,它是一个流式处理的过程,这是一个数据流在任务之间流动,那当前源任务已经读取了三个数据,三条数据是不是代表后边所有的数据就都已经处理完了呢?啊,当然是有可能处理完的,我们看现在的状态就已经是处理完了,因为我们已经统计出来哈,这这里做萨进行K之后哈,分到了一个分区啊,Word分到了一个分区,然后接下来呢,呃,这里的。
12:07
哈的个数是二,而word的个数是一,那我们知道sum本身它是一个简单的聚合算子,那简单的聚合操作其实本质里边都有一个k state,它的K当然就是当前的求和之后的那个值了啊,也就是我们当前的count值嘛,每来一个就加一,所以当前的二和一其实就是上算子。分区之后,他们分别的状态啊,对于这个哈而言,对应的状态是二,Word对应的状态是一啊。后面如果我们要输出的话,当然就是有一个哈一,然后WORD1接下来又碰到一个hello,是HELLO2,这是我们当前的一个状态。然后接下来如果说我们现在要做一个检查点保存是什么意思呢?那就是要等整个这个数据,前三条数据在整个这个任务处理的链条里边都处理完毕了,然后接下来就可以进行保存了。
13:13
啊,也就是说,假如说我们前面最后一个这个哈,有可能我们第一个哈和第二个word都已经处理完了,那这个时候其实上面。哈的对应的这个值应该是一,好,那第二个哈有可能还在路上。这个时候前面的原任务S任务这里已经偏移量是三了,假如这个时候我们收到了job manager发来的要去进行检查点保存的指令的话,是要同时停止去保存吗?不是这样的,我不能把这里的一和一进行保存,而是要等到当前最后一个数据,也就是hello word哈,第二个哈,完全处理完成之后才能去保存。也就是当前如果说我们都已经把它统计出来了,这个二和一是可以保存的。
14:05
那我们就知道了,当前保存的话。我们现在中间的这个map任务是没有状态的啊,所以它不用去保存,那S任务有一个状态是偏移亮三,那后面这个sum任务有两个状态,其实我们知道对于这个萨而言。Kid state底层应该是一个类似于哈希表的结构,是k value的结构,所以应该是一个哈,一个二,一个word,一个一,把这个当成整个的状态保存下。这就是我们当前的一个检查点。啊,可以把它放到外部持久化的存储系统当中。这就是我们所说的检查点保存的原理和整个的过程,关键就在于需要让同一个数据全部处理完成之后,所有的任务才保存当前的状态,保存到检查点中。
我来说两句