00:00
接下来我们介绍一下检查点的配置,其实在前面第九章的时候,我们已经介绍过检查点的一些相关配置了,因为我们知道在默认情况下,Flink程序它是禁用检查点的,所以如果说我们想要开启检查点功能的话,自动去保存快照的话,那是需要在环境里边显示的,去调用这样一个enable checkpoint,然后里边要传入一个参数,就是当前。隔多长时间去自动的做一次检查点保存,哎,这是之前我们都已经介绍过。而对于这个enable checking呢,如果说我们不传对应的参数,其实也是可以的啊,那当然了,这个方法现在已经被弃用了,如果我们不传的话,那默认的时间间隔是500毫秒,500毫秒做一次检查点的保存啊。这里边我们就会发现涉及到一个问题,就是这一个时间到底是大了好还是小了好呢。其实它是。
01:03
根据我们实际项目的需求来进行设置的,因为我们在实际项目当中可能希望当前的容错性要非常的高,非常的好,而且就是说如果发生故障进行恢复之后呢,要求延迟一定要比较低,因为我们知道这个时间间隔相当于是进行两次检查点保存的时间间隔,那也就是说最坏情况下有可能。如果说这个时间间隔是500毫秒的话。最坏情况下就有可能保存了一个检查点之后。500毫秒这个时间点,下一个检查点刚刚触发还没有保存好的时候,我当前就出现了故障,那如果要进行故障恢复的话,当然就要回退到500毫秒之前的状态了,所以这里边我们的延迟就会达到500毫秒啊,就是如果发生故障的时候,当前的处理的延迟就会达到500毫秒,所以如果说我们对于这个时效性啊,实时性要求非常高,对于容错性也要求非常高的话,那可能这个时间就要配置的稍微的短一些。
02:13
而如果说我们对于这个时间延迟要求不是那么的高,而对于当前的系统性能要求比较高的话,我们知道对于检查点的保存肯定是要耗费每一个任务它本身处理的性能的,是要耗费系统资源的,要占据我们处理数据的时间的,肯定我们不希望把所有的时间都浪费在这个做自动存盘上,肯定还是希望啊,正常情况下我们大部分的时间还是用来做数据处理的,那存盘只是为了防止意外,所以这个时间就需要做一个。合理的设计了,根据我们具体的项目需求来进行合理的设置。当然了,在检查点的功能里边,不仅仅只有这样的一个间隔时间的设置,还有很多其他的设置,我们接下来就讲一讲检查点的配置。
03:08
对于检查点来说,最重要的一个配置其实就是检查点位置的存放,存放位置啊,那这个存放位置我们之前说过,在。配置文件里边,整个集群的压迫配置文件里边是可以直接去定义的,那这里呢,我们介绍的是在代码里边也可以去定义,因为有时候我们可能针对不同的应用想要给它配置不同的存放检查点的位置啊,那这个位置呢,取决于checkpoint storage它的设置。所以我们会看到在代码当中的设置呢,同样还是要基于en去做一个配置,这个en首先要get checkpoint con,所以这里面我们是需要做这样的一个操作,Get checkpoint con,然后去做一个get checkpoint。
04:02
啊,我们看到可以去设它的out,可以设其他的一些东西,我们现在先考虑的是,那这里边当然就可以去定义各种各样不同的storage了,默认情况下是什么呢?要注意默认情况下,因为我们推荐啊,一般情况要用的肯定是。分布式文件系统了,而默认情况下我们并没有定义文件文件系统的路径,所以默认情况下用的是一个叫做。Job manager check storage这样一种方式,那它是把当前的checkpoint其实不是存放到文件系统,是存放到了job manager的内存里面去。啊,所以我们会发现这种方式呢。比较好的一点是它的存放肯定就会比较简单,比较快速,而且读取也比较容易啊,因为它是在内存里边吧,但是呢,缺点也非常的明显,那就是如果当前的检查点非常大,存放的东西非常多,这manager内存也是有限的呀,那就放不下了。而且还有一个问题就是说内存。
05:10
同样它是不稳定的,它其实不是持久化的保存,如果掉电的话,那相当于我们要重就是装manager,如果要是掉电,如果要是挂了的话,那么整个我们的之前保存的检查点就全部丢掉了啊,所以在更多的情况下,我们其实是不使用这种方式的,那用什么方式呢?当然就是要保存到文件系统里面,就是所谓的file system checkpoint storage。啊,那当然你如果要是在这里配置的话,那需要去单独把这个出来,里边需要有一个字符串定义出来的checkpoint,这样的话我们就可以指定当前应用它的检查保存在哪个路径里。这就是关于检检查点存储位置的配置,当然更多的情况下,我们可能不需要对某一个应用去单独做这样的一个设计啊啊,那一般我们就直接在集群的配置文件里面把它设定好就可以了。
06:12
在实际生产应用当中,当然这个分布式的文件系统,一般我们定义的都是,呃,像HTS3这样的啊,就自带高可用的分布式的文件系统。那除了基本的这样的。间隔时间的定义,还有配置对应的checkpoint存储位置的这样的配置项之外,那。Flink还给我们提供了很多其他可选的高级的配置项啊,那所有的检查点的配置项都是通过所谓的check检查配置这样一个来进行设置,那所以我们首先还是要到前的check方式,就是基做。
07:00
我们可以直接把它拿到。当前直接就把它叫做,接下来我们分别去做一个简单的设置,来看一看到底怎么样去做这样的一个实现。首先我们会看到。我们直接去调用checkpoint对应的方法的时候,我们看到除了前面我们用过的set checkpoint storage存储位置的设置之外,还可以设置各种各样不同的东西,比如说我们这里可以直接设置checkpoint的。Timeout,那timeout的话,我们发现它其实所谓的就是超时时间,超时时间的话,那其实就是。超过这个时间,如果要是没有完成的话,就会被直接丢弃掉,诶,那所以这里边其实要传入的就是一个长整型的毫秒数,表示超时多长时间,超过多长时间的话,就直接丢弃这样一个,呃,这样一个配置,比方说我们这里可以直接给一个。
08:02
6万啊,那我们知道这其实就是60秒,就是一分钟了,超过一分钟的话,当前这个checkpoint就不要继续做了,因为太耽误事儿了,太耽误时间了,我干脆就直接把它抛弃掉,然后接下来继续做数据处理就可以了,这是为了性能考量做的这样一个超时时间设置。那另外。我们会看到还有这是一个模式的选择。模式检查点模式的话,它主要是要。保证检查点的一致性级别,我们可以看到这里要传入其实是一个checkpointing mode啊,那当然了,Checkpointing mode这本身是一个枚举类了啊,那它里边可以选择的。我们看到有一个叫做at least once,就是最少一次啊,那当然了,默认情况是什么呢?默认情况其实是。
09:01
我们可以看到其实是once啊,就这里边我们可以直接把它选择为精确一次,也可以是至少一次,至于它两者的区别,这涉及到了状态一致性的级别,我们会在后边继续做进一步的讲解。那除了这些之外,我们接下来还可以去做一些其他的配置,比如说我们可以去配置当前的。最小时间间隔m po between checkpoints,那这里面要传的也是一个长整型的毫秒数,哎,那这个很明确,也是一个时间了,那这个时间又是什么呢?什么叫做最小的时间间隔呢?它是两个。Betweenpoints,那就是前后两个检查点之间的最小间隔。这里可能有点奇怪,我们不是已经配置过检查点周期性的那个间隔了吗?呃,如果我们觉得这个十秒有点太长的话,呃,因为我们知道发生故障的话,这最大就会延迟十秒,那比方说这里我们给一个一秒,那假如说这里给了一秒的话,这里再给一个500毫秒。
10:14
他们俩会有什么区别呢?主要的区别就在于这里的一秒,这个本身检查点的时间间隔,它是检查点触发的时间间隔,也就是说,诶,我们这里边。每隔一秒钟。Manager就向所有的S任务发出一个命令,发出一个指令,制作当前的检查点。要做保存了,检查点就。当前的SS任务就在。数据流里面插入一个Barry。这之间的距离,时间间隔是一秒钟。而这里如果我们配置了500毫秒的话,那代表的是。前一个检查点,这个是它的触发时间,那我们知道触发了之后,接下来Barry要在所有的任务之间传递每个任务还要保存当前的状态进行持久化,呃,然后还要通知job manager。所有。
11:13
工作全部完成,当前的检查点才能够完成,哎,那所以呢,检保存检查点是要耗费时间的。如果说当前检查点本身就耗费了。200毫秒的话。那么这里的500毫秒指的就是说下一个检查点至少要跟前一个检查点的结束。他们之间要有500毫秒的时间间隔啊,那当然了,现在这个是200毫秒的话,这个间隔足够大,那我们还是按照周期性的隔一秒出发一次就可以了,那假如说当前这个前一个。Checkpoint制作的时间太长,已经达到了700毫秒。
12:00
这个时候怎么办呢?诶,那这个500毫秒就起作用了,就是必须在前一个checkpoint完成之后,间隔500毫秒。然后才能够触发下一个检查点的保存啊,这其实是给我们的数据处理留下了足够的时间,不要把数据处理的时间全部占满,都去做检查点保存了,这样的话就不符合我们的初衷,所以这其实是结合起来使用就可以让性能。整个的这个性能变得更好,就保证性能的前提下,然后再加上容错性的保证啊,这是我们整个这个配置的一个基本思路,就是做各种各样的拳头。除了这个配置项之外呢,呃,跟它有点类似的,另外还有一个叫做max concurrent,这里边要传入的就是一个类型的数量,什么意思呢?这就是最大并发的检查点数量,也就是说同时我当前正在处理正在保存的检查点,正在做检查点保存的,而能有几个检查点。
13:11
我们可能会有点奇怪,那检查点不是每隔一段时间一个一个去触发去保存的吗?诶,那在这种情况下,怎么还有可能去配置多个检查点呢?确实有可能出现这种情况,那就是当前的检查点保存时间很长的时候,就我们说的当前一个检间隔一秒钟去触发检查点保存,那第一个检查点都已经保存了。1200毫秒。还没有处理完,诶,那这个时候假如说我们没有配置前面的这个最小间隔的话,那隔一秒钟之后,接下来第二个检查点就继续要进行保存了,那这个时候我们同时就有了两个检查点在进行保存。因为它都是根据每一个任务接收到barrier去做时间点出发的嘛,啊,他们之间其实是没有影响的,接收到barrier就去保存,接收到就去保存,同时可以进行。
14:08
所以这里面我们可以去配置,当然了,我们并不希望同时做的检查点太多,因为你前一个还没保存完,下一个又开始了,很显然是没有意义的,所以往往这个我们要给一个最大的限制。而如果说前面我们已经配置了最小间隔的话,哎,那很明显这个最小间隔会比较。比较有优先级更高啊,那所以这里面必须要隔开500毫秒,那显然同时执行的并发的检查点数量就只能是一了。所以假如说我们前面已经配置了MS的话,那么现在最大的并发数量就只是一。然后接下来我们再看。还有其他的一些配置项,哎,那就是我们这里边可以去enable。On aligned checkpoint,这是什么意思呢?很明显这就是前面我们提到的可以去开启不不对齐的检查点操作,也就是不再去执行执行当前的barrier,一定要去做分界线对齐,只要接收到一个barri,直接就可以去保存了,这样的话,只要启用了这一个方法,那接下来就可以大大的减少在出现反压的时候检查点保存的时间啊。那这个设置呢,它是有要求的,要求当前的检查点必须得是once这样的模式,而且并发的数量只能是一,哎,这样的话我们才能够开启这个。
15:42
非对齐的Barry操作。除了这个之外。另外还有一些可以去进行开启或者说配置的项,那比如说可以去配置,我们看这里有一个enable eternalized checkpoint。这里面所谓的eternalizedpoint指的就是说是否开启检查点的外部持久化,我们可能会想到,诶,这个检查点本身它不就是做了一个持久化保存吗?哎,那为什么这里边还要做这个,呃。
16:17
开启持外部持久化的这样的一个检查呢,这里边主要就是因为默认在作业当前作业失败的时候,它是不会去清理我们当前的已经持久化的,已经做好的那些检查点checkpoint。那在有些场景下呢,如果说我们想要做这个释放空间的话,那是需要自己手动去进行清理的。因为默认情况下,如果整个drop整个作业失败的话,那检查点其实是不会自动清理的,想要释放空间的话,那是需要手动去进行清理,这里边要配置的呢,其实我们看到它要传入的是一个eternnalized checkpoint这样的一个参数,那这个又是一个枚举类型,它里边呢只有两个选项,一个叫做delete on cancellation,另外一个叫做re return on cancellation,所以它主要配置的是当一个作业被取消的时候。
17:18
也就是说不是发生故障,是我们在外边手动取消了,在这个web UI上点击了看L,这个时候当前它的检查点是否要去清除啊,那那我们自然就会想到了,在这个默认的情况下。默认情况下呢,是会在。进行看L取消作业的时候是会删除当前的保存点的,那如果说我们想要让它继续保留的话,那可以把它配置选择成啊,那所以这里面我们可以把这个配置成externalized checkpoint clean up.onation啊,这个是可以做这样一个操作。
18:00
我们还可以进一步去配置。当前是否要允许。检查点的失败,就假如说检查点失败的话会怎么样啊?当然了,在默认情况下是不允许检查点失败的,这个是通过tolerable checkpoint failure number来进行配置,那默认情况下这个数量其实是零,也就是说完全不允许检查点的失败,如果要检查点失败的话,当前就相当于整个作业也是失败了,那就相当于发生故障,然后退出。然后再进行重启了,那如果说我们不想让它失败的话,那这里边其实是可以去传入一个啊,我们看到这里边本身要传的是一个int值,那我们可以直接传入一个int的最大值,那这样的话,它就相当于是永远不会失败,那就是如果失败的话,相当于对于当前的作业就没有任何的影响,相当于是可以无限的容忍下去。
19:01
我们也可以去定义一个特定的数值,表示失败这么多次是可以允许的,那超过的话,当前的作业就要直接废掉。这就是具体的一些在代码当中对于检查点可以进行的配置。
我来说两句