00:00
呃,所以这是这个检查点Barry的一个提出,好,那接下来我们看一看这个,那Barry在具体的过程当中,它又是怎么样去传递,怎么样去触发我们的这个快照机制呢?呃,大家可能直观的想的话,这还不简单吗?那你既然是有这么一个barrier,那你就我们流失处理吗?一个任务一个任务往下传不就完事了吗?诶,这里边大家想的是非常简单的,好像就是一条流,里边都是顺序执行的一些任务,对不对?来大家看一下我们真正的分布式系统里边是什么样子。那是不是并行的,那大家看这里边不光是并行任务,这里还有还有多个圆对不对,那大看这里边我们跟刚才比的话,SS还多了一个,有两个S了,然后接下来大家看,如果两个S同样还是输入这个自然数构成的这个数据流啊,1234567,那么他们输入之后,接下来如果要按照基偶去分类,然后做sum的话,大家会想到这是不是就类似于一个kba,一个重分区的过程啊对吧?哎,我这里相当于是按照奇偶性去K了嘛,我可以先把它那个map成一个二元组,按照奇偶性map对不对啊,然后然后map成那个二元组,有一个就表示表示它是奇数还是偶数,然后再根据这个奇偶性去做K伴,所以大家看在这个过程当中,那是不是我这里边的一个sum任务,有可能会收到来自两个源发来的数据啊。
01:37
嗯。哎,在这种过程当中,那我们他的这个快照又怎么样去处理呢。好,首先我们先大家先熟悉一下现在的状态啊,先看一下现在的状态进行到哪里了。呃,我们看一下现在是哦,第一条流这是已经读到三了,对吧,读到三了,然后呃,剩下四五还没来,第二条流是读到四了,呃五六还没来,然后这里边诶大家看S这里边三已经读完了,所以三在路上了,对不对啊,三在这里了,那二是不是已经处理处理完了呀?呃,这里边some已经处理完了,所以大家看some的这里的结果是是五,五是代表一一对不对?哎,大家想到为什么是两个一呢?113对吧?对,因为这里面不是两条流嘛,对吧?所以这里边它这里边的这个五代表什么含义呢?对,我们看其实它是什么?它是这里边黄色的数据流里边先来了一个一,对吧,然后蓝色的这里边又来了一个一,是不是又来了,来了之后它这里。
02:53
元一加是不是加加出了一个二啊,然后二是不是就输出了一个蓝色的二对吧?然后接下来黄色的又来了一个三,他一加是不是加出了一个黄色的五啊啊,后面这输出的这个蓝色和黄色就表示它是当前是哪一条流来的数据导致它改变对不对?诶这里边它就出来出来一个黄色的五,所以当前如果蓝三还没来的话,这里边它的状态是不是就是一加一加三啊。
03:24
呃,当然这里大家知道是蓝衣黄衣黄三构成的这样的一个状态对吧?啊,那那同样这里大家看到一开始第一次进来的应该还有一个黄衣啊,怎怎么没有黄衣了呢?呃,大大家说是加掉了是吗?哎,你这里边sum,那当然sum完了之后黄奕也应该有输出啊。你来一个数据不是输出一个吗。发生了事哦对,是不是已经发到thinkink里面去了呀,那就写入外部外部系统了对吧?啊,所以黄衣就不在这里了啊,所以大家看到是这样的一个一个逻辑啊呃,所以大家看一下当前下面这条流状态是五,然后上面呢,上面这一部分,那蓝色的这一部分二是在路上的,那大家看这个偶数,它当前的和是和是二,那相当于只有一个对,只有一个黄二来过了,对吧?所以这里这就相当于黄色的是1234,二在上面,一和三在下边加过了对吧?四是不是还在路上啊,然后上边是1231已经来过了,加在下边了,二和三是不是都在路上啊,而且二是要给到我们上边这个偶数计算这条这这个分区的这个子任务的,那三是不是要给到下边奇数计算的这个子任务啊。
04:51
对吧,是这样的一个状态啊,这是当前的状态啊,这个就稍微有点复杂了,大家要好好的梳理一下,好,然后接下来大家看我们现在假如说要开始。
05:04
这个时候要触发checkpoint的话,那会怎么样做呢?Checkpoint之前我们在讲这个运行式架构,讲到flink里边的四个组件的时候,其实已经说了是谁来触发的checkpoint。谁对,是job manager对吧?啊,得是这个整个作业的管理者去触发这个操作,所以大家看是job manager出发启动这个checkpoint的过程,那么它会向每个source任务发送一条带有新的检查点ID的消息,那比方说这里边这里面的这个二表示什么呢?啊,这不是时间戳,或者说大家以为那个water mark,或者说什么数据啊,这表示是对当前的那个checkpoint ID对吧?第二个CHECKPOINT2号checkpoint,现在开始做二号checkpoint做快照,快照保存啊,然后大家会想到接下来。
06:01
这个这个标记是不是就会被S任务收到啊,那SS任务收到,诶大家看现在其实他发他的,其实后边这个任务没闲着对不对,对吧,后边任务干什么了?跟跟前面相对比,是不是234这里的234还在路上,后边这个五还在路上,是不是两个二写到那个think系统去了呀,对吧?所以大家看这个根本根本不影响前边做前边的后边继续做对吧?啊,甚至这个过程是他这儿这个在路上的时间有点长对吧,还没过来,其实甚至有可能他这儿都已经处理新数据了,这完全是有可能的啊,所以接下来会怎么样呢。接下来数据源把这个状态是不是这个SS任务。就会收到这一个检查点的这个消息之后,那么他这里边就会把自己的状态写入到checkpoint里边去,对吧,存起来,那当前这个S任务的这个状态是什么呢?就是当前读取数据S源那边的偏移量啊,所以大家看当前S1就把刚才的偏移量三存进去,存了一个蓝三,然后同样S2这个圆就把自己的这个偏移量四存进去,存了一个黄四,对吧?所以整个这这就是这个已经存在checkpoint里边的原任务的他们的状态,当他们把这个东西都存好之后,它会返回一个消息,大家看这里边有一个返回的通知,返回给job manager啊,告诉job manager,好举手,我这边已经搞定这个保存,我的快照已经存好了,那job manager。
07:57
廖师长好,您已经搞定了。那同时SS任务也不能闲着,他会干什么呢?诶,那大家会想他会继续处理处理这个处理这个任务吗?在在做这个保存的过程当中,他会直接处理任务吗?
08:14
处理后边的数据吗?不会对,他这时要暂停下来,暂停数据的发送和处理,所以它相当于是一旦收到呃,Draw就给他发过来的那个检查点的通知,他就相当于暂停从源里边去读取数据,然后做状态的保存,保存完了之后通知job manager,而且它会干什么,会向下游任务。发出一个带着检查点ID的barrier啊,这里就是真正我们所谓的这个barrier就朝下传递了,而且它发出barrier的方式是什么呢?是直接广播,大家注意啊,这里边是直接广播,为什么要广播,大家想一想。
09:04
因为后边的数据后,后边的这个任务,它这里边是要做重分区的,对吧,后边的任务是不是并不知道这个他还会不会给他发数据啊,如果说他不把这个呃,Checkpoint barrier发过来的话,那相当于我这边就并不知道你接下来给我发的数据到底是属于上一个检查点还是属于下一个检查点了,对吧?所以他一定要把这个东西发过来。广播出去,当然了,下面的这个,呃,黄色的SS2也是这样,对吧,把它这个广播出去,那大家看在这个过程当中,S在这么做的过程当中,后边的some任务和S任务是不是也没闲着呀,他们处理到哪了啊,大家看那个二和三是不是就都处理完了,所以上边这个大家看前面这里边是不是对,二应该是到这个偶数这里边来加,是不是就加成了四,而三应该加到这个奇数这里,是不是就加成了八啊,所以而且这个五大家看也已经已经到think里边了,对吧?所以后边都没闲着,都在处理过程当中。
10:19
好,然后接下来大家会想到barrier已经发出来了,那他接下来得在我们整个流里边,在各个任务之间是不是要传递啊,好,那接下来很很正常的,自然我们的sum任务就会先收到一个barrier,他收到这个barrier之后会干什么呢?哎,有同学说那一样嘛,你south任务拿到这么一个标记的时候,它会停止,因为得保证前面是一个checkpoint,后面是一个checkpoint,对不对,那是不是这里我拿到之后也应该停止处理,然后前面是一个checkpoint,后面是一个checkpoint呀,整体思路是这样的,但是这里边有一个问题。
11:02
你这里边既然它是广播出来的barrier,那是不是我下游的这个some任务就有可能会收到多个上游分区任务传来的barrier啊,那我到底是收到第一个,然后我就直接停止吗?我我就直接连下面的那个数据也不处理了吗。其实不是的,因为我收到第一个蓝色的二,蓝色的Barry,二收到的时候它代表是什么?是不是第一条流SS1这边。要做checkpoint的二号checkpoint的数据都已经来了呀。而黄黄色的流这边该做二号拆测范围的数据都来了吗?是不是没准啊,这个东西保证不了对不对?那什么时候才能确定黄色这条流里边该做二号拆矿的数据都来了呢?是不是得收到黄色的拆矿二才能这个barerer,才能表示他这边的数据该来的都来了,所以这里面大家注意some任务会等待所有它的输入分区的bar瑞尔都到达,然后才会去把当前的这个任务去做存储,把当前的状态去做保存,然后才会向它的下游继续去发送barrier。
12:27
哎,这是它的这个过程,诶,那大家会想到,那在这个过程当中,那那也就是说我只收到蓝色的这个二这个BARRY2,那黄色的这条流还在正常处理,对吧?那那如果要是蓝色的数据也来了呢。哎,这里大家注意,如果要是说蓝色的数据又来了的话,在这里不做处理,先缓存,大家想想为什么你这里要处理了的话,那是不是把下一个checkpoint的数据也存到当前这个checkpoint里面来了,对吧?这个状态就不对了,所以说这个得缓存,而这个黄色的三角这个barrier还没来,所以黄色的这个数据如果要是这个时候来了的话,继续正常处理更改状态,所以大家跟前面去对比,是不是这里边这个黄色是先来的呀。
13:21
大家看看是这样的状态,对吧,黄四先来了,所以是不是正常处理输出了一个八对吧,状态改成八,然后输出了一个黄色的八,而蓝色来了之后,是不是就直接缓存在这里了,哎,所以它是这样的一个状态。同样下边,呃,下边大家会看到这里边就是黄二直接来了对不对,黄二直接来了之后是不是还要,就相当于这个黄五来了之后就不能处理了,对吧,他要等蓝二也来,所以整个这个过程我们把它叫做Barry的对齐,就所有任务他都要等到所有输出分区的barrier对齐之后才能够啊,继续抽,继续把这个。
14:08
状态存储到checkpoint的里边,然后把barrier继续朝下传递啊,这是这样的一个过程啊,然后接下来看现在,诶,终于都对齐了,黄色的和蓝色的这个barrier每个任务都已经收集齐了,那接下来是不是就可以把当前的状态真正的做保存了,对吧?当前checkpoint的数据都已经到期了,所以。戴维对齐的时候,把当前状态存入checkpoint中。那大家看这里面就存了一个蓝色的八,又存了一个黄色的八,对不对啊,这里面表示这个some任务,两个分区子任务,他们的当前的状态。然后接下来干什么呢?哦,接下来他们需要把这个barrier继续向下游发送,还是广播对不对,如果要只有一个下游任务的话,当然就直接发送就可以了,所以大家看把这个蓝二这里边这个黄二都发送下去了,另外他还得干什么事情呢?接下来继续,接下来是不是要继续处理数据啊啊,所以接下来要做的是首先。
15:15
先把缓存的数据处理完啊,大家看是不是这样对吧?因为缓存的数据先来的呀,你要保证那个顺序,所以先是不是把这个蓝四先加进来啊,蓝四加进来是不是就变成了一个12对,是一个蓝12输出对吧?然后接下来是不是又来了一个黄六啊,黄六再加上是不是就变成18了对吧?而且输出了一个黄18啊,那同样下边这里边是黄五先来了对吧?黄五来了之后是不是加成黄13,然后蓝五再来了之后是不是加成了蓝18对,所以这就是接下来正常处理的这样一个过程,这就是这个flink的checkpoint算法的一个过程。
16:00
呃,那那当然了,就是说当我们这个每一个下游的任务完成之后,他也要向job manager,就是保存checkpoint完成之后,也要向checkpoint,呃,向draw manager去确认checkpoint完成,那当最后的这个think任务也是一样,当他把这一个呃收到这个,呃,我们这里边的这一个barer标记的时候,然后就去做自己状态的保存,呃,然后保存完了之后通知job manager,我已经完成了,那当job manager收到所有任务发来的确认通知的时候,好,你们都已经搞定了。那是不是draw manager就通知大家好,OK,我们当前这个checkpoint保存完成,接下来如果要出现问题的话,从这个checkpoint开始恢复,对吧?啊,所以这就是整个checkpoint保存的一个过程啊,呃,那除了这个checkpoint之外呢,在flink里边还提提供了另外的一个东西叫做c point啊,这个东西比较有意思啊,也是非常独特的一个一个功能,它其实从原理上非常的简单,它的原理跟checkpoint一样,几乎完全一样,这算法也是完全一样的,所以大家可以认为所谓的cpoint就是,呃,就是有了额外的有了一些原数据的一个checkpoint。
17:26
但是它的使用跟checkpoint完全不一样,因为checkpoint其实是不需要只要我们做了一些基本配置之后,不需要我们去特殊操作的拆point怎么样做做呢?就是我们只要在代码当中把它label打开,对吧?然后去设置一些基本的呃参数,呃设置比方说间隔多长时间做对吧,然后呃设置其他的一些参数,然后制定一个啊,就是我们所谓的这个重启策略,比方说隔多长如果挂了之后,对吧?呃就是隔多长时间重启一次啊,就是或者说在多长时间内重启几次,可以制定这样的一些策略,这是checkpoint的用途,主要是用于故障恢复,而c point呢?
18:11
C point主要不是用于故障恢复的,它是用来干什么呢?它是用户自定义的。所以说它不是自动去保存的,而是用户你必须明确的去触发,我要把当当前的状态保存成一个保存点,然后你要去从一个保存点启动的时候,也是在启动这个drop的时候,要去指定c point是什么。那这是same point的用法,那大家想想这个point有什么用途呢?到底干嘛呀啊,当然C画呢,也可以作为一个手动备备份,对吧?啊,作为一个备份,然后做这个故障恢复,当然也也是可以做这个的,那还可以做什么呢?呃,当然就是除了这个有计划的手动备份之外,还可以更新应用程序,这是为什么呢?因为flink它可以允许我们从保存点启动,完全不同,但是状态是兼容的这样的。
19:16
应用程序,那这个有一个好处就是什么,我代码都完全可以不一样了,对吧?言下之意就是说,诶,我改了一个bug,我的应用程序升级了没关系对吧,我线上该跑着还跑着,跑完了之后呢,诶我该暂停的时候,我停一下,存一个c point,然后我把代码一升级一更新,然后从c point重新来跑,诶继续又跑起来了,直接在线升级了,对吧?啊,当然这个不能说在线升级就是,但这个相当于这个时间就非常短,对不对啊,不需要就是那个整个数据回退啊或者怎么样,这个就非常非常的简单,非常容易啊,另外呢,这个就还可以做什么,还可以做AB测试,大家可能听说过就是呃,我线上在跑这一套系统,对吧?哦,稍微做了一些更改之后,诶,我再跑另外一套系统,看看这个不同的用户反馈,收集这个测试数据啊,可以做这样的一些事情,另外还可以干什么呢。
20:14
它还可以在这个不同的集群上启动同样的应用程序。这就代表我们可以干什么,可以直接升级fli版本,用这种方式升级flink版本,那就是之前我们跑的是这个,呃,1.7对吧,然后现在要跑1.8了,我直接存一个c po的状态存一下,然后把这个集群更换更新,然后直接从这个cpoint提起来,这样也是可以的。呃,另外就是还有很有意思的一些东西,就是说哎,我们就可以干什么呢,可以现在这个跑的任务太多了,有一个很重要的任务,特别靠资源。这个时候我把那些不必要的任务就先停一停,缓一缓,对吧,暂停,我把它存一个c point停下来就相当于释放资源了,对不对?哎,然后等那些要紧的任务给他,给他已经做完了之后,再从c point恢复出来啊,所以这个就就非常非常能做很多很多有意思的事情啊,这也是弗link给我们提供了一个比较强大的功能。
21:16
好,这就是我们给大家讲的这部分内容。
我来说两句