00:01
有了分界线barrier,那接下来我们就知道每一个任务都是在接收到barrier的时候去保存自己的状态,我们就可以明确的知道检查点到底什么时候去保存啊,那在同一条流里边,我们知道这个bar瑞尔和数据的相对位置它是固定的,那在他之前所有的数据都应该要先处理,因为我们是流处理,来一个处理一个嘛,之前所有的数据都应该先处理完,然后才能轮到barrier,那这样的话,之前所有数据的。对应的状态改变就会保存在里边了啊,这也是我们说它可以按照barrier,按照检查点把前后两部分的数据分隔开,我们把它叫做检查点的分界线。但是我们知道在flink里边本身它是一个。分布式的流处理框架,那所有的任务都应该是并行的。
01:03
如果是并行的话,那我们就会发现这个流的结构就不会是那么的简单了,不会是按照顺序依次执行依次去处理了,有可能会出现乱序,这就跟我们之前讲到的水位线ma是一样的。那假如说这个时候考虑到分布式执行的时候,又得考虑哪些要点呢?Barrier的处理又有哪些细节需要去考虑呢?这就是我们接下来要去讲的。分布式快照算法。呃,可以首先回忆一下水位线的处理,因为我们在前面的了解过程当中都已经发现了,Barrier和walmark其实是有很多共通之处的,他们都是插入到当前数据里边的一个特殊的数据结构。那bar瑞尔在进行并行任务之间,上下游之间传递的时候,它的规则是什么呢?诶,那就是上游的任务向多个并行下游子任务传递water rockck的时候,那是直接广播出去的。
02:12
而如果有多个上游任务向同一个下游任务去传递watermark的时候,那又应该怎么样呢?诶,那我们说当时walmark的意义,它的含义主要是要表示当前的事件时间啊,那如果说我们能够正确处理乱序数据的话,那当前的事件时间,事件时如果进展到了某一个时刻,那就应该表示在这之前所有的数据都已经齐了,哎,那所以这个W呢,那是应该取一个。每一,每一个分区,Water mark,最小的那个。我们设置不同的分区,Water mark最小的那个作为当前任务的事件。所以我们就想到了在barrier的并行数据流里面,传输的时候是不是也应该有类似的规则设置呢?确实是这样。
03:07
对于barriry而而言,它指示的是之前的所有数据,它的状态改变应该要保存到当前检查点里,而那他不是说之前的所有数据都应该处理完了,都应该到齐吗?所以他们表示的含义都是一个截止时间。所以他们是非常类似。那具体实现上呢,其实flink的。检查点算法是昌迪兰伯特算法的一种变体啊,被叫做异步分界线快照算法,它的核心其实就是两个原则,一个就是上游任务向多个下游任务并行的子任务发送barrier的时候,那还是直接广播出去。那如果是多个上游并行的子任务向同一个下游任务传递barri的时候,那应该怎么办呢?
04:00
诶,之前我们是water mark里边是直接取最小,那现在没有小不小说了,现在的barrier它只就是一个标志,它表示当前要进行检查点的保存,哎,那我们的要求就是必须等到。不同分区的所有的标志,所有的barrier。都来了的时候,我才能保证所有分区之前的数据都已经处理完了啊,那这个时候我才去保存当前任务的状态,所以这个操作叫做分界线对齐。也就是说,等到所有上游的并行子分区barrier都不全部到齐,这个时候才开始保存。啊,那接下来我们还是针对之前的。这个应用事例来详细的解释一下检查点算法的原理啊,那为了更加的明确说明它分布式快照的这个算法,我们把它做一个扩展,那就是扩展成并行度为二的场景,那这个时候的话就不像之前我们只有一条牛的输入了,我们现在并行度为二,那应该有两个并行的SS任务同时在读取当前的数据。
05:16
那这里我们拥有了两条流啊,或者是我们认为这是从同一条流,或者说同一个数据源里边划分出来的不同分区,我们用并行的两个S任务去进行读取,那这里的数据呢,我们认为它都是一样的,跟之前的设定是一样的,都是一个hello,一个word,然后hello flink hello word hello flink,这样交错开。然后接下来我们看一看,后边也就同样有了两个并行的map任务,还有两个并行的丧任务,那我们接下来看一下现在的状态变成了什么样子呢?因为我们知道后边经过K败之后,那就有可能。第一个分区SS任务读进来的数据有可能会分到下边的这个分区来,而第二个分区读取进来的数据呢,也有可能K之后会分到上边的第一个分区来啊,我们现在的话就假如哈,这个K就分到了第一个分区,Word这个K分到了第二个分区,我们看一下现在的状态是什么样。
06:19
现在我们会看到,呃,我们就为了方便描述的话,就把这个叫SS1版,下面这个叫S2吧,我们会看到现在SS1。偏移量是三,S2偏移量是一,所以S1跟前面一样,已经读取了hello word hello3个数据。而S2呢,只读取了一个数据,就是一个哈。那我们知道本身flink里边的不同的任务,他们处理数据是有先后的,而且他们是并行执行的,互相之间各不干扰,所以S任务读取到了S1读取到了三,S2读取到了一,后边的数据呢,后边的任务呢,并不一定是处理完了这两个数据,所以我们看到map任务,哎,这个时候MAP1。
07:07
他其实是处理完了第一个哈和后边的word,第二个哈还没有处理,还正在处理中,而第二个map呢,Map这里是把。第二个SS2读进来的哈,已经处理完了。然后接下来看some some这里边上面的第一个分区K,哈哈,是只处理了一个,这是来自于上边第一个分区SS一传过来的浅色大楼。而第二个分区的深色的哈呢还在处理过程中。前面还有一个数据,那是word word这个是分到了第二个分区,现在也还没有处理,所以这里边word是零,所以我们看到上面这个第一个任务啊,第一个算子S,这里S1处理了三个数据,S2处理了一个数据,但是这四个数据呢,其实最后在萨任务这里只处理了一个。
08:09
那剩下的一些数据。有一些是还在some任务这里正处理边的啊,一个是这个word,另外一个是SOUTH2这里的哈,他们俩是在some任务这里处理的,那第二个哈呢,浅色的这个哈,还在map任务这里处理。他们当前处理的状态,当前的这个时间点是不同的。那接下来就是检查点保存的具体的算法了。那首先我们应该想到由job manager要发出一个指令,触发检查点的保存啊,那这个时候它会向所有的S任务去发出指令。SS任务接到之后就会。插入一个分界线,在当前数据流里边直接插入一个分界线,所以我们发现了针对不同的流,或者是同一个流的不同分区读入的这个不同的分区,我们不需要要求当前SS1和SS2它的偏移量一定要一致,因为这个就没关系嘛,他我第一个S可能读的快一点,读到了三个数据,第二个S呢,我就只读到一个数据,这个是没问题的。
09:20
我们只要最后保存的时候把并行的任务。对应的偏移量同时都保存下来就可以了啊,所以当前SS任务接收到了Barry的时候,那就把当前的偏移量作为状态保存下。具体在执行的过程当中,其实我们知道manager肯定是向task manager发出的指令,它是周期性的,向每个task manager发送一条当前的呃,带有新检查点ID的这样一个消息啊,那当前的这个barrier里边包含的信息其实就一个,就是当前的检查点ID,比方说这是一号检查点啊,我们就可以把它放在这里。
10:03
接下接下来这个每一个task manager上就检查每一个是S任务,就在S任务里边插入检查点,然后保存状态。而在SS任务保存状态的过程中,我们会发现这个时候后面的任务并不受影响,它完全并行,而且各干各的,互不干涉,所以后边我们会看到sum任务这里,诶,终于把这个word处理完了,它也输出了一个WORD1现在状态word这里。改成了一变,变化成了一,而后边的两个哈呢,还都没有处理完,所以还在路上。接下来我们就继续看。Barry在不同的任务之间流动的过程,以及后续任务对它的保存处理的过程。接下来第二步啊,那我们就会看到。第一个SS任务状态如果已经保存完毕的话,那分界线就要向下游去传递,传递的规则那应该是。
11:06
直接向下游广播出去啊,那这里面我们会发现这个,呃,在真正划分了。真正合并了算子链之后,哎,那其实第一个S任务和map任务其实应该是合在一起的啊,他们之间是one one的关系吧,而且并行度又相同,所以这个过程当中其实是不存在广播的这个过程,我们可以认为它直接就过来了。那关键是要到后边才要做这样的一个广播啊,那在这个过程当中,在前面这个map任务进行传输的过程当中,后边同样不受影响,萨任务还在继续处理,我们看到现现在接下来。S2,这里深色的这个哈也处理完毕了,所以又输出了一个哈二啊,那当然了。浅色的上面S1处理的第二个哈,还在路上,还没有处理完啊,后续我们才会去进行处理,而且我们知道当前的S任务如果已经把它的状态全部保存完成之后,已经在持久化存储空间里边保存完毕了,那这个时候呢,他会向manager发一个确认信息,告诉manager我的快照已经保存完毕了。
12:24
然后接下来那瑞就朝下游传递。这里面会涉及到另外一个问题。就是照manager这个时候是不是就可以确认当前的检查点已经完成了呢?呃,我们知道当前检查点其实只完成了一部分,只是S任务完成了,这并不代表检查点已经完成,因为之后我们是要靠这一个检查点去从故障恢复状态,那你如果直接把这个拿出来恢复的话,后边的状态都没有啊,就显然不能够从故障里边恢复啊,那当前的manager不能直接确认检查点完成,而是要等到所有的任务都确认保存完毕的时候才能够。
13:08
确认检查点真正的完成。接下来当然就是Barry尔向下游任务传递的过程了啊,Map任务我们知道没有状态,所以它会继续向下游传递,我们这里边下游是进行了KBY分区,所以我们知道下游有两个并行的子任务,这这里面我们认为是并行度为二,然后刚好两个K分到了两个并行子任务里面,所以barrier当然就要广播到下游并行的两个子任务里面去,这里需要注意的是。经过并行传输,我们分布式传输之后,Barrier其实向下游传递的过程。也是有快有慢。那就是第一条流,就是像我们这个上面SS1,这里SS1MAP1和SS2MAP2,他们的barrier,像some任务传递的过程呢,可能是有先有后的,比如说。
14:07
现在我们就是当前是二二这里的。Barrier第第二条流里的barrier先到达了上面的sum衣这个子任务,哎,那这个时候我们就会发现当前sum任务需要去直接保存当前的状态吗?这个时候是不对的,我们知道现在萨姆这里边哈,判断当前的状态是。本身在之前啊,这里面本身应该是二,那这个时候我不能直接就做保存,因为。SOUTH1SOUTH2MAP2这里的哈都已经处理完了,在在这个检查点分界线之前的哈都处理完了,因为我收到他的barri了,但是。S1MAP1这条流里边的哈,还不能确定它都处理完了啊,所以这个时候呢,我们还应该继续等待,等待什么呢,把S1。
15:07
MAP1这里的哈也全处理完,那怎么样能处理完呢?当然就是要等到对应的那个barrier也到来,这就是barrier对齐的一个过程。当然了,下边的第二个分区word也是一样,如果说诶,我们这里边是先来了S1MAP1这里的bar的话,那同样。第二个分区的萨姆任务,萨姆二也不能直接保存自己的状态,即使现在我们看到这个就应该它的状态就应该是一,这也不能直接保存,而是要等到两个BARRY2都对齐都到来的时候,这个时候才能进行状态的保存。啊,所以我们会看到啊,接下来我们要做的就是barrier对齐,等到所有上游分区的Barry全部到。那如果全部到来之后,接下来我们就知道了,可以把当前的状态进行保存了,保存的话,我们保存到的就是HELLO3WORD1。
16:08
那大家如果仔细看的话,可能会发现,诶,对于这个第二个分区SUM2来讲,好像看起来我们这个WORD1很早就都已经处理完了呀,后边的这个等待的这个过程是不是就没有必要呢?诶并不是这样,这主要是因为我们要判断的这个时间节点,保存的时间节点是什么呢?其实是。S1处理完三个数据,SS2处理完一个数据的那个时间节点,诶,所以我。只处理完一个WORD1的时候,这个时候我并不能确定已经第一条流处理完三个数据,第二条流处理完一个数据,我必须要等到两个barrier到齐,这个时候我们才能够确认这一点啊,所以这就是分界线对齐它的意义所在。
17:02
那当然了,在分界线对齐的这个过程当中,是有可能还会收到新的数据的啊,那这个时候当然我们的状态是不能直接保存的,但是我们知道当前如果收到新的数据的话,继续做计算,那这个状态就会改变。是不是所有的数据都是来者不拒,来了之后就做计算呢?这里需要做一个区分啊。那就是如果对于当前的萨一来讲,如果他接收到的数据首先来到的是SOUTH1MAP1,也就是浅色的第一条留分区里边到来的数据的话,诶,那当然对应的这个barrier还没来嘛,所以之前的数据当然是要处理的,我们就正常处理,把它状态的改变叠加在里边,然后输出数据就可以了。但是假如说假如说这个第二条流里边,这个分区里边处理的速度特别快。
18:02
当前他的Barry来了之后呢,紧跟着后边的这个word哈就都已经来了,那我们知道word是分到了第二个分区,那哈呢,又分到了上面这个分区,这个时候就已经来了,那这个哈还要再加一吗。这里就不能再去加一了,因为当前这个分区的barrier已经到来了,它之后的数据就不应该状态改变,保保存在当前的checkpoint里面,所以那接下来来的深色的哈,那就应该要缓存起来,不能正常处理。而浅色的哈,就是Barry没有到来的那个分区的哈,就可以正常处理状态都叠加在里面啊,所以这个过程当中我们可能还要把一些数据缓存起来。那当我们把当前的状态都已经保存完毕之后啊,那所有任务都已经完成了,就会通知到job manager job manager就会确认当前的检查点已经保存成功,那如果发生故障的话,就从这个检查点去读取状态进行恢复就可以了。
19:10
然后接下来所有的任务就可以正常继续执行了,我们所说的正常执行并不是说要把之前的阻塞住,我们看到在前面处理后,后续这个sum任务保存的时候,SS任务就直接正常去读取了。因为它的任务该。它的状态该保存什么,其实都已经保存完毕了,所以就正常读后面的数据,也就是因为前面能够继继续的正常处理数据,所以后边我们再做检查点。保存,在做这个barrier对齐的过程当中,才有可能需要去缓存一些处理的比较快的数据啊,那接下来如果当前萨任务都已经保存状态完毕了,检查点都已经保存成功了,那这个时候它的缓存数据就可以继续正常处理了。而且我们要注意,为了保证我们所有数据的先后顺序,应该先把之前缓存的数据一个一个拿出来处理,处理完了之后再去正常读取接下来的数据。
20:16
那之后的顺序就完全跟我们正常处理的过程完全一致了。还要注意的另外一点就是因为分界线对齐,它是要求先到达的分区,是得做缓存等待的,哎,那所以在这个过程当中,先到达的这个分区相当于数据就要堆积起来了,哎,那如果说我们之前的这个任务,它处理数据的速度特别快的话,我们这里数据就会越堆越多,诶那就有可能会出现所谓的反压或者叫被压bad pressure,那出现这种情况。就会导致我们一层一层递推回去,我们的任务进行检查点保存的时候可能要保存很久,而且堆积了大量的缓冲数据,我们整个系统的性能就会大大的降低啊,那为了应对这种情况呢?呃,Flink01:11版本之后就提供了不对的检查点的保存方式,那所谓的不对齐的保存方式呢?那就是它可以让我们在收到一个。
21:30
分区对应的那个barrier的时候,就直接把当前的状态做一个保存,诶那我们可能会想到,那你如果不做对齐,来了一个barrier就直接保存,那是不是就会导致另外其他的分区有些数据的状态就没有保存进来呢?呃,确实是这样的,所以它的代价就是在保存做这个状态保存的同时,也要把未处理的缓冲数据,也就是说已经接收到的其他的一些数据,类似于当前的一些上下文信息,也要保存到检查点里边来。
22:05
啊,那这样的话,我们只要遇到一个分区bar的时候,不需要等待,直接就可以启动,保存就会更快,延迟更低,但是可能我们每一个检查点就要保存更多的信息。这就是检查点的具体算法。
我来说两句