00:00
我们已经了解了flink底层对于检查点保存的一个具体的实现,那就是引入了所谓的检查点分界,现开point bery,有了这个概念之后,诶,那整个这个处理流程其实就非常简单了,就是我们所说的啊,就是数据流里边要进行快照保存的那个数据,后边追加了一个barri,那数据流我们处理的时候就是看到数据就处理数据,看到barrier就进行状态的快照保存。那前面我们也提到了,对于flink这样的一个分布式集群而言,最重要的还是要考虑有多个分区子任务的时候,并行执行的时候到底应该怎么样去处理,哎,这个是我们要处理的一个关键问题,前面我们提到啊,如果下游有多个分区子任务的话,诶,那这个很简单,Bar瑞尔来了之后,我们就相当于之前所有的数据都已经处理完了嘛,那就直接广播出去就可以了。这个很简单。
01:00
这是下游有分区子任务,那假如说当前的某一个任务,它的上游也有并行的分区子任务呢?这个就麻烦很多了,因为上游如果也是并行不不唯一的话,那就说明对于当前这个分区下游的一个并行子任务而言,有可能会接收到。不同的上游分区传递来的Barry,因为它要广播嘛,那上游不同的分区要传递的话,就有可能不同步,它是先后到来的,那我到底应该以谁为准呢?总有个先来后到吧,是接收到一个Barry,一个分区传递来这个分界线之后,我就要做快照保存吗?还是说等两个分区都到来之后我才做快照保存呢?这就是我们要解决的关键的问题,那对于flink而言,它其实底层是有这样的一个检查点算法的啊,那我们把它叫做。基于昌迪兰波特算法的一个分布式快照算法。
02:04
具体的名称,那就是叫做。异步分界线快照算法,哎,这个算法的核心呢,其实就是两个原则啊,主要就是考虑我们如果有多个并行子任务的时候,到底怎么处理,那多个并行子任务的话,就有两种情况,一个是下游有多个并行子任务,这个比较简单啊,上游向下游的多个并行子任务发送Barry分界线的时候呢,直接广播就可以了。那另外还有一种情况比较麻烦,那就是当有多个上游并行子任务向同一个下游任务传递分界线的时候,那又应该以什么样的规则来触发它的状态的快照保存呢?诶,那就是需要下游任务执行一个分界线对齐操作,也就是说需要等到所有的并行分区的barrier它的分界线都到齐了之后,才可以开始状态的快照保存啊其实这个也非常的容易理解,因为我们说分界线代表的是在他之前的所有数据。
03:05
对应的那个状态保存都要进入当前的检查点里边嘛,哎,那所以如果说我们有多个并行的分区的话啊,那很显然应该等所有分区之前的数据都已经到齐了,都已经处理完了我们当前任务才能够确认现在可以做状态保存了。啊,那这个听起来稍微还是有一点绕啊,为了更加清晰的了解这个分布式快照算法,我们还是用一个例子来进行一个讲解,哎,那我们看一下这张图,这其实就是之前work count的那个代码里边做了一个并行度的扩展,之前的话我们看只有一条流的这个处理的过程啊,只是后边这个kba做了一个按键分组啊,那我们可以认为之前做的操作是并行度为一,现在呢,我们把并行度调大变成二。啊,接下来我们就看到了啊,前面的SS任务,它就是从两个并行的数据源里边去读取数据,那分别这里面我们输入的数据啊,还是按照之前我们的规则都是hello word hello flink,一个一个这样交错去读取,然后接下来呢,那就对应的有两个S任务,他们分别保持着自己的一个读取的偏移量。
04:17
后边呢,Map也同样有两个并行的子任务了啊,做一个二元组的转换在后边,那就是并行的两个sum任务啊,那这里的两个sum任务的话,我们就相当于简单一点啊,就按照当前的K把它做了一个分组,Hello,这样一个K对应的分到第一个分区啊,那word这个K呢,对应的数据分到第二个分区。所以我们可以看一下现在这张图所表示的状态是什么?哎,这个状态呢,我们直接看SS任务这里很显然上面的这个第一个分区的S任务,我们把它叫做S1吧,SS1这里边读取了三个数据,偏移量是三,也就是hello word hello,跟之前我们考察的那个并行度为一的时候的那个保存的状态其实是一样的啊,然后另外一个第二个分区,我们把它叫做SS2,这里只读取了一个数据,也就是只读了一个hello进来。
05:14
啊,那所以后边我们要做的这个统计就会看到。应该目前是已经读入了四条数据,其中有三个哈,一个word啊,那当然了,这个处理的过程前后上下游任务其实并不是同步的,并不是同时会处理完一个数据啊,所以我们看到啊,前面SS任务已经读入了四条数据,而后边的map转换和sum的处理呢,那并没有跟上。当前的map步转换,诶,那是已经转换了两个哈,和一个word,然后后边的第三个哈,诶这个时候还在路上。在下一步的萨进行统计计算呢,诶,我们看到这里面其实只统计了一个哈,所以只输出了一个哈一,然后第二个哈和另外一个word呢,还在路上,还在处理的过程当中,这就是当前整个系统的一个处理状态。
06:10
啊,这就是我们当前基准的一个状态,接下来呢,哎,那就要做的一个操作就是做一个检查点的保存了,啊,那这时首先由drop manager向所有的task manager发出一条消息,周期性的发送,啊,发出这个消息就告诉他们需要做一个检查点保存,而且带着当前的检查点ID,比方比方说ID就是一,那就是一号检查点要进行保存了,那么每一个task manager上的S任务就会在当前数据流里边插入一个ID为一的分界线Barry。当然了,首先SS任务需要把自己的状态,也就是这个偏移量先做一个保存,哎,这就是checkpoint里边的一部分。所以我们看到这一时刻插入了检查点分界线barrier之后,相当于我们现在要去保存的状态。是什么时候的状态呢?就是并行的两个分区读取数据源,第一个分区读取了三个数据,第二个数据,第二个分区读取了一个数据的时候,总共读取了四个数据的时候,我们现在要把这个状态做一个快照保存。
07:20
好,那SS任务在这里啊,我们进行快照保存,插入bar瑞尔分界线,在这个过程当中,后边的map和sum任务并没闲着,他们该干什么干什么啊,所以我们看啊,比方说这个map任务这里没有太多的变化,那sum这里呢?诶,Sum这里我们看到这个word已经被处理了,已经输出了一个WORD1啊,那当然了,后边的这个第二个和第三个hello还在路上,这个我们后边再去一个一个处理,它总是有一个时间先后的嘛。好,这是第一步操作,就是触发检查点的保存,然后接下来。SS任务把当前的状态做了快照保存之后,那当然分界线这个Barry就要向下游去进行传递了,当然它如果保存完毕之后呢,还需要向job manager去做一个确认,告诉job manager我这边的状态已经保存完毕了,那接下来Barry朝下游广播出去。
08:17
啊,当然了,其实我们知道啊,South和map这两步操作之间,他们的传输关系其实是一对一的啊,是went to one forward的这样的一个直传的方式啊,啊本质上来讲,他们应该是要合并这个operator chi算子链,他们他们其实就是一个任务,哎,那所以我们看到这里呢,其实就没有出现交叉广播的这种情况,直接Barry传递给当前分区的map就可以了啊,然后map要进行自己状态的一个快照保存啊,我们知道map本身没有状态,那就什么都不做处理了,诶直接朝下游再去进行一个广播传递啊,那当然了,这个过程当中,Sum这里该干什么还要继续干啊,所以我们看到萨姆这里面哈,就已经变成。
09:00
哈喽二统计了两个哈,输出了。与此同时,已经做完快照保存的SS任务,他也没闲着,继续读取后续的数据,我们看到SS第一个分区的S1已经读取了第四个数据,有一个link读进来了啊,那同样第二个分区的S2啊,它也读取了第二个数据,Word也已经读进来了,所以我们看到啊,就是每一个任务相当于啊,他们都不考虑别的任务到底现在在干什么,只考虑自己就行了,只看自己接收到的数据。如果是普通数据的话,那就处理,如果是bar的话,就做快照保存。接下来就是非常关键的一步操作了,因为bar瑞尔要传递到了第三步,也就是sum去进行聚合计算,我们知道啊,萨之前是要进行K败做一个按键分组传输的,所以呢,每一个萨的并行子任务都有可能会接收到来自上游不同的map任务传递来的这个分界线Barry,那所以这里边我们以谁传过来的为准呢?那就要执行分界线对齐操作,也就是说必须。
10:11
上游的两个map子任务传过来的barrier都到了,我才能触发当前这个萨冰行子任务的状态,快照保存。我们可以看到,在这张图当中第二个分区,我们把它叫做萨姆二,它已经接收到了来自上游。两个maps任务传递来的barrier分界线,哎,所以这个时候已经对齐了,那就可以把自己当前的状态做一个快照保存了啊,所以当前统计出来的是WORD1有一个word。而上面的第一个分区呢,SOME1这里就只接收到了来自第二个分区传递来的分界线,哎,那所以接下来他还要继续等待,哎,那在这个等待的过程当中呢,他其实还是有可能会接收到新的数据的,那如果接收到数据又应该怎么办呢?之前我们说原则就是接收到数据就处理数据,接收到分界线就保存当前的状态快照啊,那现在我在等待另外一个分区的分界线,那这个时候接收到数据是不是继续处理呢?
11:19
注意,现在要做一个区分对待了。因为我们现在做检查点快照保存的时候,原则是在当前分界线之前的所有数据都应该对应的状态要保存在检查点里面,那之后所有的数据的状态不能保存在这个检查点里面。那所以呢,如果是我们看第二个分区这个Barry已经来了,哎,那如果是第二个分区继续传来了新的数据,那这个时候就不能处理。因为之后来的数据对应的状态改变啊,不应该保存在当前检查点里面,而如果是Barry还没有到的第一个分区又有数据来了,哎,这里比方说啊,这个哈一终于来了,这个时候我要正常处理,因为对应分区的bar瑞尔没来,之前的所有数据对应的状态改变要体现在当前的。
12:13
检查点里面。啊,所以我们看到啊,这就是进行检查点对齐操作的一个关键,那就是如果检查点在等待对齐的过程当中,已经有一个分区的检查点,分界线已经到来了,那其他有些分区还没有到来,这个时候如果来了数据该怎么办?那就是已经到来分界线,已经到来的分区对应的数据,那是要先缓存起来,而没有到来分界线的那些分区的数据。直接进行处理,改变状态。所以我们把这些都处理完了之后,接下来当然就是等到了对齐的分界线啊,把所有的分界线都已经收集齐了,就可以把状态持久化进行保存到checkpoint里面了,所以我们看最后保存的checkpoint应该就是HELLO3 word1,这就是之前我们说的啊,第一个SS分区读取了三个数据,第二个SS分区读取了一个数据,处理了四个数据之后,统计出来的sum结果是有三个hello和一个word。
13:22
这就是我们所说的保存的时间点是所有任务都处理完同一个数据,如果有多个分区的时候,那就是同时处理完多个分区的同一个数据,在这个过程当中,前边的所有任务,那还可以照旧去读取,照旧去处理。我们看到现在的SS1已经处理到了第五个数据,SS2已经处理到了第三个数据。那每一个任务做完了当前自己的快照保存之后呢,都会向job manager发送一个确认信息啊,那如果说job manager收集到了所有任务传递来的确认信息啊,那就自己可以知道了当前的检查点成功保存,它可以确认我们已经有了一个成功保存的检查点,如果发生故障,那就可以从当前检查点进行恢复了。
14:12
这就是在分布式快照算法下边检查点进行保存的过程,哎,那所有的检查点保存完了之后,接下来的处理又应该怎么处理呢?诶,之前我们说因为。等待分界线对齐的过程当中,有可能会缓冲数据嘛,把一些分界线已经到达的那些分区对应的数据先要缓存起来,接下来如果继续处理的时候怎么办呢?哎,当然是按照顺序先来后到嘛,先把缓冲区缓存的数据先进行一个处理,处理完了之后再正常的读取后续的数据。所以整个的这个处理流程我们就完全梳理清楚了,能够保证数据正常的处理顺序。但是在这个过程当中,其实我们也发现了分界线对齐这个操作,那是要求我们要做一个等待的分界线,已经到达的那个分区,如果又有数据到来的时候,我们是先把它缓存起来了,没有及时去进行处理,那这对于实时流处理来讲,就相当于引入了一个延迟。
15:15
在实际处理的过程当中,假如出现了所谓的被压啊,就是这个back pressure的时候,那下游任务呢,就有可能会堆积大量的缓存数据啊,这个时候对我们性能的影响可能就非常严重了,一个检查点可能要等好久才能够保存完毕啊,那为了应对这种情况啊,为了平衡实时性,我们处理的效率和容错性,那flink11:11之后提供了一个新的特性,那就是不对齐的检查点保存方式,也就是说我们对于当前这个检查点的保存呢,可以不进行分界线的对齐。只要收到了一个分界线,直接就可以进行状态的保存了,诶但是我们就知道这有问题啊,那另外一个没分界线没到的一个分区对应的那些数据,我们相当于就还没到齐啊,这时候怎么办呢?诶没关系,我把当前的上下文缓冲区里面的一些数据也都同时作为状态保存下来。
16:13
所以这种方式呢,其实是。以保存更多的上下文信息数据作为代价,换来了更低的延迟啊,我们做检查点保存的时候就可以更快了。这个不对齐的检查点保存方式其实是引入的一个非常重要的改进的功能啊啊,所以在后续的弗link版本当中,这个功能也变得越来越完善了啊,这就是关于弗link里的检查点算法,关键点就在于是一个基于昌笛兰伯特算法的分布式快照机制。
我来说两句