00:00
我们现在已经知道了flink里边的检查点机制,也知道了它的算法里边的一些运作的细节,那接下来我们就看看代码里边到底怎么样开启这样的一个checkpoint这样的一一个过程,对吧?啊,我直接就在这个之前的state这个这个test里边给大家加一段代码好了,因为之前在状态编程这里不是给大家配过一些跟状态后端相关的嘛,啊,当然这个是注掉了啊,呃,就是后面如果要是你在这里想去开启检查点相关的一些配置的话,也可以在这里去去做一个配置啊,那给大家看一眼这个。配置的时候怎么样去配,这里面大家看到有一个方法叫做,因为环境里边有一个方法叫做。Get checkpoint con啊,所以我们接下来要配的东西全部都是在这个checkpoint里面去配的。好,那那接下来我们看一下这里面到底有什么啊呃,当然你这里边可以去传一个这个configuration对吧,就是完整的这个configuration,然后下面大家看到有各种各样的get,就是get它的各种配置项,对吧?那如果说我们要去配的话,当然就应该是配set了,我们看看set能set哪些。
01:17
啊,这里边首先要跟大家说的是,呃,在这个,呃,我我们做这个checkpoint的配置的时候啊,首先应该在环境里边,在在get这个就是checkpoint conflict之前,首先应该做一个事情叫做enable checkpoint。因为在flink代码里边默认情况下,Checkpoint其实是不启用的啊,所以这个大家要注意一下啊,如果说你当前的这个job,你要给他启用checkpoint的话,那必须在这里边写一个enable checkpoint。啊,当然这里边大家看这个,这个方法已经被弃用了,那现在应该要怎么用这个方法呢?就是在里边你不能不传参,只说启用必须要传参,对吧?呃,这里边大家看这个,呃,在在这里边其实你能能够看到它其实底层调用的时候,他给的这个默认的时间间隔是多少。
02:17
啊,大家看这里还set了一个这个checkpoint interval对吧,默认的时间间隔给了一个500毫秒,这个都是好秒数啊,跟我们之前那个watermark那个那个间隔是一样的,所以大家看它默认的这个时间呢,其实就是呃,也不要太长,对吧,你不要隔的时间太长,导致我们的这个呃出现故障之后,恢复之后实时性受到很大的影响,然后呢,也不要太短,呃之前我们说那个周期性生成watermark默认的时间间隔是200毫秒嘛,这个比那个要大,对吧?啊,所以这里边就是说默认处理数据的话,数据那边的触发肯定要更快一点,而这个做保存呢,呃,间隔可以比那个大一点,这边是500毫秒啊,所以这里面当然后面我们可以用这个checkpoint conig去set这个这个interval对吧?啊,这个当然是可以这样去做的,另外还可以怎么样呢。
03:10
在这里边我们看一下别的调用的方式啊,Enable。诶,大家看enable checkpointing除了不传参的这种方式默认500毫秒之外啊,另外更常见的是传一个长整型,那如果说这里边你传一个长整形的话,这就是直接把interval传进来了,对吧?啊这就是相当于直接在启用checkpoint的时候,这里边就把这个对应的呃,间隔的时间触发拆point间隔的时间直接带进来。这里边都是毫秒做单位啊,这个大家要搞清楚,然后我们其实看到它里边其实调用的又是什么呢?又是下面的这个方法对吧?啊,就是这个传两个参数的这个方法,所以你在直接enable这个checkpoint的时候呢,还可以给一个参数,给一个mode,给一个模式。啊,那这个mode又是什么呢?刚才大家其实。
04:03
在这里边应该也看到了啊。大家看到这里边给的mode叫exactly one,然后所以这里面的mode其实指的就是说我们确定的那个状态一致性的级别。给不同的模式。默认情况下,你如果不配的话,默认就是exactly one对吧?我们说呃,这个弗link是能达到这个这个语义的,那除了这个之外还有什么呢?呃,Checkpoint mode这里边大家看它是个枚举类型,里边除了exactly one之外还有at least ones啊,这个到底是什么意思啊,大家知道这是至少一次对吧?啊,后面我们讲到状态一致性的时候,再给大家展开来把这个说一说,到底这个是什么意思啊?呃,那呃,我我们在这里就是给大家一般情况的配置啊,就在这里边直接给一个比方说,呃,往往你如果要是说那个不用默认的500毫秒的话啊,有可能就想让这个时间间隔长一点,对吧,不要占用太多的这个,呃我们这个处理的时间,所以比方说我给一个1000啊,这个是完全可以的,就给一个1000毫秒,也就是一秒钟触发一次拆point的计算。
05:11
啊,那后边你可以直接跟上那个moded,如果不给的话默认。或者你这里边不去,在这里边给其实也是可以的啊,因为我们前面不是看到那个env可以get checkpoint con之后可以set很多很多东西吗?哎,大家看前两个,这就是可以配置项啊,首先这个interval,那你前面已经配了,这就不用再set了,对吧?然后后面你想重新set也可以啊,然后上面这个check框mode啊,这就是刚才大家看到的那个。对吧,这这里边要传的一个就是check mode啊,这这里如果说你直接写的话,直接就把这个可以copy过来,对吧,比方说我这里边给一个这个把这个引入啊,Stream API check mode这里边如果不给exactly ones的话,你想给at least ones也可以,对吧,因为默认是ones嘛,那这里边区别其实就是一个exactly ones是精确一次嘛,这个状态一致性保证当然会更强。
06:11
所以那at least one一般什么时候用呢?就是你如果对状态一致性啊,对这个正确性要求不是很高,然后呢,就又要要求它快的时候,哎,这个时候你就相当于把这个级别调低,它所耗费的性能就少了。啊,就是在对实时性要求很高的场景下,那可能下面你也不要用那个,呃,事件时间对吧,这里也不要用这个exactly one这个语义,你就直接给at least one这些都是在做这个,呃实际项目的时候优化的一些点啊,就根据具体性能来做优化,也不见得说哪个配置就一定更好。好,那我们再看看别的。别的同样还是要get checkpoint con之后,然后再去set,对吧,我们继续看下边,下边又有一个叫做checkpoint timeout。啊,这个其实从名字一看就看看明白是超时时间嘛,对吧?所以它的含义就是说我们执行一个checkpoint的时候,从job manager那边发出一个一个消息,让每个source任务去添加那个barrier,对吧?从这个时间节点开始,然后到这个最终结束,我们知道最终结束是所有任务都要完成当前状态的保存,呃,交给那个,呃,Drop manager一个一个一个响应,对吧,一个回复,那么照那边通知大家说好,我现在完成了这个时间,那我不能永远等下去,对吧?如果说当前这个checkpoint一直在做,一直在做,耗费的资源太多,但还没有完成的话,啊,这我我得给一个超时时间,超过这个时间我不做了,对吧?这没完成的这个这个checkpoint直接给它丢弃掉,About啊,这就是这个timeout时间的一个含义,那当然这里边大家看到它要传进来的参数也是一个长整性,所以也。
07:59
是一个毫秒数,那比方说呃,这个时间一般给到什么级别呢?一般给到分钟级别差不多对吧?呃,比方说我给一个6万,那大家知道这就是这就是等一分钟嘛,每一个checkpoint做一分钟,如果超过就丢弃。
08:15
然后接下来啊,有有同学可能就想到了啊,在这里边我做这个,呃,Checkpoint的时候,你难道说在做这一分钟的过程当中,我就一直在那儿等着,等着他做吗?就不能干事儿了吗?那这个大家回忆一下,之前我们在做这个状态后端配置的时候,大家还记得吗?这个,呃,File system state back end,这个里边它有一个可选的参数叫做。叫做异步快照,大家还记得吧?啊,就是这里边,如果说我们给了这个,呃,就是异步异步快照的这个。就是这个这个机制的话,那就相当于我这里边在写这个checkpoint的时候,保存checkpoint的时候,那不是说当前我这个任务就不能干活了,而是它是同步进行的。
09:07
啊,那有的同学可能想你这个同步进行怎么同步进行对吧,你这边一边在在保存那边一边改吗?啊当然不能这么同步,对吧,它的做法就是我先因为大家想到这个我们做拆point最大的时间耗费应该在哪里呢?它其实就是朝远程系统的一个保存嘛,保存数据嘛,所以它最大的耗费其实在呃这个网络传输对吧,序列化反序列化在做这个呃数据传输。向远程写入,这个过程是最复杂的,所以这里边的异步checkpoint,它其实是把当前的这个所有状态先COPY1份副本,Copy在内存里边的一个区域。然后接下来呢,我们本身的那个那呃状态接下来就可以更改了,对吧,之后你就是写那个checkpoint状态的时候,就用这个副本里边的状态去写,而真正的那个状态呢,后边继续处理可以去更改了啊,这样的话就并行不变,相当于可以是同时进行了。
10:08
这是这个做异步这个checkpoint的这个过程啊。呃,那那当然了,这个在平常在做这个处理的过程当中,假如你考虑到这个时间太长的话,对吧,就假如说这个,呃拆point,一个拆point就耗费的时间很长,而且我又没有配配置异步,它就同步,呃,那那这个我前面不是还配置了它的那个触发时间嘛,啊你想着我假如说一秒钟这个就触发一次checkpoint,然后一次拆point做一分钟。那要这么说的话,我就不用干活了是吧,而且是同步的话,同步的含义就是说,呃,就是当前我这个任务必须要自己保存完这个checkpoint之后,保存完状态之后才能够继续干别的事儿,呃,那那假如说我当前的这个任务就是保存checkpoint就得花费一秒以上,那你到时候这不就相当于没有时间干活了吗?呃,当前这个。
11:06
这个刚刚把这个做完对吧,然后很快后边的那个checkpoint就又来了啊,那那这怎么办呢?啊,这当然大家知道真实的这个checkpoint呢,是在原那边south那边插入了一个barrier,其实在后面做处理的过程当中,它其实就是不是说。呃,后边每一个任务都是隔一秒钟马上就能就能去做保存的,对吧?他必须是得等到那个checkpoint bar瑞尔来的时候看到的时候才能去做,所以这里边其实我们的这个间隔时间啊,大家注意,并不是所有的checkpoint保存就是所有任务开始保存拆炮的时间,大家不要认为这是他们同时保存的,对吧,他们各自都是都是并行的,这个时间是John manager给S任务触发这个checkpoint的时间间隔。那至于后面,后面他有可能处理不过来啊,处理不过来那就慢了,呃,所以是有有可能出现这种情况,那但是我有时候还还希望这个做一个什么,呃事情呢,就是你即使是后边会这个相当于这就类似于这个所谓的反压被压了,对吧,Back pressure,就是后边如果要是数据堆积的话,我前面肯定就慢了嘛,但假如说。
12:23
我我其实是不想让他出现这种类似于被压的这种情况,我其实想让怎么样的,就是你总得不管怎么样,总得给我留下一段时间去做这个正常的执行任务的。要做这个操作对吧,你得给我留下这个空余时间,你不能把所有的时间都做了checkpoint啊,那这里面有没有这样的一些设置呢?啊,给大家看一下这个,接下来再再配一些别的啊,上面几个这个是比较比较常见的配置,一般情况你打开这几个也就差不多够了啊,然后我们接下来再给大家看一下,我继续set,还可以set一个叫做max concurrent checkpoint。
13:04
啊,这maps,呃,Concurrent checkpoint什么意思呢?一看就是最大嘛,最大什么呢?Concurrent是并行对吧,同时出现的checkpoint,也就是说我最多允许同时出现几个checkpoint,呃,同时在在进行,同时在做对吧?你不要说到时候那个这个前面一个拆炮的还还还没整完,后面一个又来了,对吧,你这个整的就是,呃,最后我们这个就没时间去去真正干活了嘛。所以这个里边如果说默认情况下啊,如果不配的话,那这个就是只允许一个啊,你这里边如果要是如果配了的话,比方说我这里边可以给一个整数,对吧,最大允许两个,那就有可能我前一个还没还没完成,还在做的过程当中,下一个又触发又来了啊可以这个是完全可以的。诶,有同学可能会说,你前面不是说这个checkpoint本身在这个过程当中,默认正常来讲是那个这个同步进行的嘛,对吧?嗯,那你要这么说的话,我一个任务这里边同步进行,前面这个这个checkpoint还没做完呢,那怎么会又有第二个checkpoint又会又会开始做呢。
14:15
这个大家注意啊,Checkpoint的概念跟我们一个任务,接收到barrier之后,保存自己状态。这是,这是两回事,这是不同的概念,尽管他们有关系,对吧?他们的关系是什么?是每一个任务把自己的这个状态都保存完了之后,所有的状态合起来的那张合照,这个叫checkpoint。所以说每对于每一个任务而言,他自己可能做自己本地状态的这个保存可能很快啊,这个可能你这个就很快就做完了,对吧,但是整个checkpoint这个就没准儿了,你要等所有任务都保存完了,这个才才算数啊啊所以这个说的是假如说当前的这个checkpoint s这边肯定是已经做完了嘛,S因为它就一个偏移量,这个很快啊,那假如说后边任务还都没做完。
15:07
还在这个进行保存自己状态的这个过程当中,甚至还没接收到Barry还在处理前面的数据,那这个时候假如说我又到了这个一秒钟时间了,按道理draw manager又应该触发这个呃,这个checkpoint的操作了,对吧?那我现在要不要去出发呢?哎,这里边就是如果说我配置了这个允许它同时有多个的话,这个就可以去触发,如果我不允许只能是一个的话,那就还得等,对吧,你到了一秒钟也不能出发,继续等,等上至少等上一个做完之后我才能出发。啊,这是这样的一个机制。然后另外再给大家说一个,这个配置叫做大家看下面这个啊,下面这个叫做set main pas between checkpoint,但这个其实跟上面这个相关,就是也是处理这个要给我们留下,呃,就是至少有一些执行任务的间隔时间,那这个到底说的是什么呢?这就是个最小,对吧?上面是最大,这是最小,这难道是最小的那个同时的拆point吗?不是啊,这是说的最小的间歇时间。
16:11
最小的间隔时间,对吧,在两次checkpoint之间的最小间隔时间。诶,有同学可能想了,你这个还用说吗?这我们之前不就是隔一秒钟触发一次吗?那不就是一秒钟吗。要注意这个不一样啊,这个前面触发的时间,因为大家想呃,我我们前面说了啊,整个checkpoint保存的一个过程,它是所有任务全部保存状态完成之后,这个才叫。保存成功对吧,这个才叫一个完整的checkpoint的过程,这个过程可能耗费的时间比一秒钟要要长,那你假如说它它耗费的这个时间超过了一秒钟的话,呃,那你说。我现在有可能出现什么情况呢?呃,大家看啊,我我这个就不画数据了啊,比方说我现在是说这里我触发了一次这个,呃,这个拆框的操作对吧?招manager那边触发,我这里边插入了一个barrier。
17:04
然后接下来我做checkpoint呢。做了。这么长时间。哎,假如说。正常情况下,我们我们觉得是这个时间应该比一秒要小,对吧,你这边就做完了,做完之后,诶这不是同步执行吗?你后边这个每个任务还是啊该该处理数据,处理数据,哎,然后等下隔了一秒钟之后,这这是第一次barrier对吧,BARRIER1,然后这里是BARRIER2。二这里面再触发,诶,然后再做这么长时间。然后在空闲一段时间之后,然后再做下一次处罚啊,这是我们正常情况下,哎,那假如说假如说这个我们这里边这个这个时间啊,一秒钟就是本身做这个的时间耗费的非常长。我得耗这么长时间,超过一秒钟。那这个时候怎么办呢?呃,就默认情况下,如果我不允许同时进行这个呃,Checkpoint的话,那就相当于得往后推移了,对吧?呃,当前这个一秒钟到的时候不能做,等到他这里边真正做完上一次的时候,才能接着做下一个,这就有点像滚动窗口的那种感觉了。
18:17
啊,那如果说我要是配了这个同时执行并行执行的话,哎,那可以说我在这里边就是在这个一秒钟的这个节点上,就触发下一次拆point的计算,这个就有点儿像那个滑动窗口有重叠的那种状态了,对吧?啊,这是这几种不同的情况啊,那这里边的这个就是最小间隔时间between point说的又是什么呢?说的是中间这段时间。也就是说在这里啊,就是我们这个Barry前面说的那个触发的间隔时间啊,指的是。呃,就是触发drop manager,触发checkpoint的时间点,也就是说对于一个checkpoint这个保存的过程当中,相当于这是它的头到头的时间是一秒钟。
19:06
而现在这个main pas between checkpoint指的是什么呢?是前一个的尾,也就是保存完成到下一个的头之间的时间至少要多少?哎,所以说比方说这里边啊,我给一个,这里面至少给一个也是毫秒数啊,我给个500,那500毫秒的话,那代表什么意思呢?就是说前一个跟下一个之间至少要500毫秒。哎,那有同学就就想了啊,那假如说本来的这个空空,呃,就是空余的时间超过了500,那怎么办呢?啊,它这个是至少嘛,超过的话,你就还按照当前的这个,就是一秒钟一次的这个来出发,这个就没问题了。那假如说这个要比500毫秒小呢,大家看这个好像已经超过一半了,对吧,一秒钟这个可能不够500毫秒了,那怎么办呢?那就是下一次往后推移。往后推一段时间。
20:01
最后保证这里边的头和尾之间的间隔至少得是这个500毫秒。哎,从这开始做下一次checkpoint。大家看它是这个意思,就是保证两次之间至少有500的间隔。那下面这个同样啊,下面这个,如果你要是配了这个至少间隔500毫秒的话,那下边就你即使是超过一秒钟了,这个直接占满了,对吧,后边还得隔开500毫秒。隔开500毫秒,500毫秒之后才可以从这开始做下一个框。啊,这是这个最小间隔的这个含义啊,所以大家其实也会发现啊,我这里边如果配了这个最小间隔的话,那其实上面配这个就没用了。好,就是下面这个配置会把上面覆盖掉,你如果要是至少他们之间都要隔开一段距离的话,那它还能同时发生吗?肯定不能同时发生了吧啊,所以一旦配置了这个最小间隔,上边默认的这个同时的checkpoint就是肯定是一之前你配多少都没用。
21:04
好,这是呃这一部分啊呃,然后这里还需要给大家再把剩下的这个也稍微说一下。大家看还有什么啊,这个还有两个,这个稍微简单一点,一个叫做呃,Prefer checkpoint for recover recovery,然后这里边传的是一个布尔类型。哎,那这里边这个假如说我给处的话,这个啥意思呢?我们看一下啊,这个布尔类型指的是是否就是他prefer嘛,就是你是否更倾向于用checkpoint做故障恢复。哎,这听起来好像莫名其妙对吧,我们说这个flink的故障恢复机制,它不就是用checkpoint做故障恢复吗?它的这个prefer的含义是说,假如说除了checkpoint还有什么可以做恢复呢?Savepoint吗?我们前面不是讲了savepoint,那如果说我当前有一个s point比。
22:01
最近一次成功保存的checkpoint还要离得近,诶,那这个时候你用什么对吧?啊,就是所以这里边可以用这个布尔类型来做一个判断判判断,如果说你这里边给了处的话,那我就是说c point我不管,我默认的就是要用checkpoint对吧,即使是c point更近,我也用checkpoint,就是这个意思啊。这个布尔类型默认是是false啊,大家看一下这里边默认是false对吧?啊,就是默认是拆放的,跟C不一样吗?对吧,哪个进用哪个啊,默认是这样,但是一般情况你手动那个我们这边一秒钟一次对吧,你你刚好就能,呃这个就是到它要发生故障的时候,你能刚好能就是呃插在两次那个那个中间刚好这个离得更近,这个其实还是还是挺难的啊呃,就除非就是你刚刚手动保存了C,它马上就故障啊,这种这种情况比较少见啊,所以说这个其实不是特别重要,一般不配也可以。
23:01
另外再给大家说,最后还有一个。这里面大家看还有一个叫做呃,To tolerable checkpoint failure number啊对吧,这里面还有一个这个参数,这个参数它指的是什么呢?呃,这个它说的就是你到底要容忍多少次checkpoint失败。好,就是说呃,这个checkpoint的失败大家知道,就是说如果说在做这个checkpoint的过程当中,本身checkpoint的挂,诶这里面就有一个问题,就是我们自己的这个任务本身是没问题的啊,啊这个是正常在执行的,但是checkpoint挂了,那你说这个这个算不算挂了呢?啊,这就是这个问题啊,就是说,呃,那其实之前还有一个一个选项,大家看下面这个布尔类型的啊,现在被弃用了,这个叫什么,这个看的清楚吗?Set few on checkpointing errors。他的意思这个波尔类型就是说如果说你这里给处的话,那就是如果checkpoint挂了的话,我也认为当前的这个drop挂了。
24:04
啊,那如果这里边给false的话,那就是如果这个呃,Check checkpoint是这个失败的话,没关系,对吧,我就我就直接丢弃掉它,然后该怎么做怎么做不要重启,因为你假如说是当前drop挂了的话,那那相当于它是会重启的呀,啊所以这个其实是比较麻烦的是吧?呃,这个是看看这个,呃,就是你自己具体的这个需求了啊,那现在这个弃用了,弃用什么意思呢?就是这一个。大家看他的这个说法,就是你给一个int放在这里的话,如果说它是零,默认是零,那就表示什么呢?那就是表示不容忍任何的checkpoint费,这个费如果说一旦出现checkpoint失败,我也认为是drop失败,直接重启。啊,所以这个是比较强,就是比较强硬的一种方式哈,就是拆point的失败也是失败啊,那如果说你这里边给一个数字的话啊,那就相当于就是说,哎,可以可以多重启几次对吧?啊就是按照这个上限是这个次数,那之前的那个就是set f那个是啥意思呢?那其实就相当于是这里边儿int类型直接给了一个最大值,就如果说你那个set fill on,那个checkpoint failure,如果给了一个false的话,那就相当于是。
25:21
它挂了之后不重启嘛啊对吧,所以那个int就是你可以这个容忍它很多很多次啊,到达上限的一个数字,所以现在这个就用这个参数直接把那个覆盖掉了,一般情况不用再配那个了啊,比方说这里边我容忍三次五次这个都是可以的。好,这就是给大家讲一讲这个在实际应用当中拆po的相关的一些配置啊,呃,当然这里边就是说我们在呃开发环境里边,你没必要去这么去配,对吧,开发环境这个这个并不重要,那在生产环境里边呢,呃,这个每一个任务肯定是你要针对当前这个任务的一些特点啊,最重要的就是配这个前三项。
26:03
啊,就是当前的这个checkpoint的时间间隔,然后配这个当前的mode,还有一个就是说,呃,设置一个这个超时时间,一般情况把这几个配好了之后啊,最多有可能再配一个这个就是你要保证它至少间间间隔多长时间,对吧?啊再配一个这个基本上也就差不多了。这就是呃,在弗林克做这个相关的一些配置。
我来说两句