00:00
我们已经了解了link当中检查点的概念和算法,那接下来呢,我们再来看一看在代码当中怎么样对检查点去进行相关的配置啊,其实之前我们已经提到过检查点在代码当中的启用啊,因为我们说默认情况下link程序是禁用检查点的,如果说想要开启这样一个自动的快照保存功能的话,那么需要在代码当中做一个显示的调用啊,调用方法其实之前我们都已经用过了,就是直接因去调一个enable checkpointing方法。这个方法本身是可以不传参的,但是这种方式要被弃用了啊,所以默认情况下应该要传入一个interval参数,这是一个以毫秒作为单位的时间间隔。就表示了我们当前。每隔一段时间就进行一次检查点保存的时间周期啊,那比方说我们这里面给了一个1万的话,那就相当于是每隔十秒钟保存一次检查点。如果这里不做配置的话,系统默认的检查点间隔时间是500毫秒啊,那所以这里面到底配多少的话,主要我们还是做一个权衡,如果说这个数字配的比较大的话。
01:11
那就会隔比较长的时间才去做一个检查点的保存,对我们处理数据的性能影响就会比较小啊,但是一旦要是发生了故障之后,恢复这个过程当中可能带来的延迟就会比较大啊,那同样如果说给的这个数字比较小的话,很短时间我们就做一次快照保存的话,那很显然就是如果发生故障的话,很快就可以追上之前的处理状态,但是带来的代价那就是对于系统的性能会带来一定的影响。那除了这个在代码当中去开启检查点功能之外呢,另外还有一个比较基本的配置,那就是去配置检查点的存储位置啊,其实之前这个我们也说过啊,如果说想要去配置检查点的。
02:00
存储位置的话,一般情况下,我们其实是在整个集群的配置文件里面去设置一个state.checkpoint.dr啊,设置一个文件保存的路径,因为我们说一般默认情况下,我们都会把这个检查点的存储位置设置成一个分布式的文件系统啊,就是HDFS啊,那其实如果说不做配置的话。默认情况下,系统的。检查点的存放位置其实是在join manager的对应内存当中。哎,所以对于。检查点本身的存放位置呢,也是有两种配置的选择的,一种就是默认的装manager,对内存,哎,那我们知道说是持久化,你如果把它放在内存里的话,这个掉电还是会丢吗?所以一般情况下我们需要做一个更改,把它改成一个分布式的文件系统。所以这就是我们所说的啊,检查点存储位置在底层是使用了checkpoint storage这样一个类来进行规定的啊,那那个给我们提供了两种,一种叫做managerpoint,另外一种叫做。
03:10
File system checkpoint story文件系统的检查点存储啊,那呃,一般情况下我们说啊,对于这个flink的配置而言,我们应该是在集群配置文件里边统一去设,当然了,如果说我们想要针对每一个作业去单独进行设置的话,也可以在代码当中去做一个设置,那在代码当中呢,就是直接基于因为去调一个。Get checkpoint config这样一个方法,获取到当前检查点的配置项,然后去调用set checkpoint storage这样一个方法啊,那里边的话当然需要传入的啊,我们可以看一下它的构造方法。那里边要传入的参数呢?当然默认就应该是一个checkpoint storage啊,刚才我们也看到了啊,也有其他的传参方式,比如说我们这里可以直接传一个pass,一个路径,或者直接传一个string类型的对应的路径名称,或者一个uri,那这样的话其实指都是指定了当前的一个分布式文件系统的对应的路径。
04:13
所以我们这里其实想要去实现的都是一个所谓的file system checkpoint storage啊,都是相当于你了它的一个对象啊,只要传入一个路径就可以了,那如果说我们不去设置成文件系统这样的一个检查点存储的话,哎,那其实另外一种情况就是所谓的job manager checkpoint storage啊,那这个里边当然就不需要有任何的参数了,直接出来就可以啊,那更加一般化的啊,我们一般在这里想要去配置的,其实就是file system check storage里边的话,给一个对应的路径就可以,好,那这里我们不做相关的配置的话,里边的路径也就可以不用去传入了。这就是我们对于检查点最主要的配置啊,其实一个就是开启检查点功能,传入一个时间间隔,另外就是配置一下检查点的存储位置,这两个是最常用的,然后除了这个之外呢,检查点还有一些比较高级的配置选项,这里我们可以简单的来提一下啊,那这些高级选项呢,其实都是基于我们获取到的checkpoint conflict诶,我们可以看到啊,拿到的这样一个。
05:23
Get checkpoint con这个方法得到的就是一个checkpoint con,在这个类里边对应的就有很多set方法在这里面我们都可以去调用。比如说我们看到,诶,这里有set checkpointing mode当前检查点的模式,哎,这个模式里边呢,要传的就是一个checkpointing mode了啊,这里所谓的模式其实主要就是两种类型,一个叫做exactly one,另外一个叫做at least one,哎,这就是我们所谓的状态一致性的级别啊,那可以选择精确一次的状态一致性保证,也可以选择至少一次的状态一致性保证。
06:06
这里如果不选择的话,默认的选项是exactly once啊,那关于状态一致性这一部分,我们还是放到后面去做进一步的介绍。除了set checkpoint mode之外呢,另外刚才我们也看到了啊,还可以。去set当前的interval,这个就不用说了啊,跟我们直接在开启检查点的时候传入的那个时间间隔是一样的,周期性触发检查点操作的时间。啊,然后接下来就是前面我们说过的这个set storage啊,设置一下当前检查点的存储路径,那后边我们继续看啊,有已经被弃用的一些方法,然后我们看没有被弃用的方法,这里面有一个set max concurrent checkpoints,指的是设置当前最大的。可以同时进行的检查点数量啊,就相当于是并发的检查点数量,呃,那我们可能会想到检查点,这不是在我们这个流式的处理里边插入的,可以认为是一个标记吗?这怎么还可以并发呢?它不应该是按照顺序,前面是一号检查点,后面是二号检查点,这个顺序永远不会错吗?
07:14
诶,但是我们会想到啊,一号检查点去做保存的时候,那下面我们所有的任务它是并行的呀,而且是上下游之间,每一个任务它进行检查点状态保存快照的时候,这个时间都是不一致的,所以从开始触发。Job manager发出这样一个命令,开始去进行一号检查点保存,到一号检查点彻底保存结束,Job manager那边收到所有的确认信息,这其实是需要一段时间的。所以在这一段时间等待的这一段时间过程中,就有可能已经到了第二个二号检查点要去触发的时间点,这个时间间隔可能已经过了啊,所以在这种情况下就有可能出现两个同时再进行保存的检查点啊,所以这里边这个max comp checkpoints这个数量就指的是最大能同时有几个相当于检查点的并发数量。
08:12
然后除了这个之外呢,另外我们看到还有一个,下边还有一个叫做set me pause between ten points,这指的又是什么呢?这指的是设置两个检查点之间最小的间隔。最小的间歇啊,那我们可能会有点奇怪啊,之前不都已经设了那个时间间隔吗?比方说默认情况下,哎,每隔500毫秒触发一次检查点操作,那这里面怎么又有了一个所谓的最小间隔呢?这就是前面我们说的啊,每一个检查点触发的时间间隔,这是我们之前配置那个interval,这是它触发的时间点,但是一号检查点。触发之后,它真正的保存结束,这是要持续一段时间的。
09:01
哎,那有可能他在我们这个时间间隔内已经保存完毕了,一号检查点结束了。但是也有可能呢。这个保存的时间持续了很长很长,甚至超出了我们设置的这个时间间隔。那这种情况下怎么办呢?已经到了这个第二个检查点需要去触发的这个时间点,这个时候怎么办呢?那就要看当前我们到底是否允许出现并发的检查点了。如果说这里边我们max啊,并发的这个数量就给了一的话,哎,那就相当于。当前只能存在一个检查点,前面一个没保存完的话,那第二个就不能去触发啊,那如果说我们这里允许出现两个的话,当然就可以处罚了。那这里边我们又设置一个main POS,这个最小时间间隔指的又是什么呢?这就是不光我们不能让检查点并发的去执行啊,就是如果第一个前一个还没有保存完毕的话,第二个就不能触发,而且呢,两个检查点之间还得空开一段时间。
10:04
啊,这就是我们说的啊,如果第一个检查点已经执行了这么长时间的话。那不光在中间到达时间间隔这个位置的时候,第二个检查点不能去触发,而且还得空开,我们这里指定的这个main PS,就至少之间得空开这么长时间,让我们去做正常的数据处理,不要所有的时间都用来做检查点的保存啊,这就是这样一个配置的意义,所以我们看得出来啊,如果设置了这个最小间隔的话。那就相当于检查点就一定不能并发去执行啊,所以如果设置了后面这个参数,那前面的max concurrentpoint这个参数配置几都没用,就相当于默认成一了。啊,这是关于这一部分。啊,然后接下来呢,我们再看最后这个。叫做set。Tolerable checkpoint failure number,哎,也就是说我们能够容忍的检查点失败的次数,这个说的是什么呢?哎,其实我们说检查点是为了容错保证,也就是发生故障的时候去进行状态恢复的,那假如说我们在进行检查点保存的时候发生了错误,这个时候算不算是我们整个应用整个作业发生故障了呢?啊,那正常情况下的话,我们应该认为它也是发生故障了,所以在这种情况下,默认情况下啊,这里给的这个整数其实就是零。
11:31
如果是零的话,那就是表示不容忍任何的检查点失败。那只要发生检查点的故障,检查点保存失败了,那就直接整个作业就废,就失败了,所以我们整个就开始重启恢复这样一套流程啊,那如果说对应的这个数字我们设的不为零,设大一点的话,那就表示。可以容忍对应次数的检查点失败啊,就检查点挂了,没关系,我们正常继续处理数据啊,当然了,这一个容忍是有一个上限的,不能超过这个次数。
12:03
这就是我们在checkpoint conf这个类里边看到的啊,可以去进行set,去进行配置的一些常见的可选项,诶,那这里还需要去强调一点的是,我们看到另外还有两个方法没有去做对应的介绍,其中一个就是我们整个敲了set这三个字母之后啊,看到所有可调用方法里边的第一个叫做set alignment timeout,这个方法说的就是去设置当前对齐的一个超时时间啊,因为我们知道默认的检查点算法是要进行分界线对齐这样一个操作的。那这样一个分界线对齐呢,我们说这就相当于带来了时间的延迟,要进行数据的缓存,那这个过程有可能会影响我们整个系统的性能啊,整个的实时性会降低,所以flink从01:11之后。
13:01
提供了所谓的非对齐的分界线机制啊,那对于这样的一种策略怎么样去配置呢?我们这里可以设置一个对齐的等待时间啊,也也就是说,假如说我默认情况下是执行分界线对齐的,那如果说我们等待的超过了这个时间,超时时间,这里边传入的是一个duration啊,一个时间间隔,时间段超过了这个时间之后,接下来就不等了,就开始执行非对齐的。分界线策略,这里还需要注意的就是在调用setment timeout这个方法之前,它必须要保证我们当前已经开启了非对齐的检查点策略,那就是要让这里边我们的一个属性参数啊,叫做unaigned checkpoints enabled,让它变成一个true,哎,那怎么样让它变成true呢?当然对应的也有一个方法调用,就叫做enable onigned checkpoint,那就是开启非对称的检查点策略啊,那这样的话,将当前的onaigned checkpoint enable的这个参数啊,把它指定成处之后,接下来我们可以给它配置一个timeout时间啊,就是等待的超时时间,超过这个时间范围,那就执行非对齐的检查点策略了,也就是分界线不再去等待对齐这样的操作,哎,所以这个对于性能的提升还是非常显著的。
14:32
那开启这个功能呢,还有一些额外的要求啊,就是要求当前并发的检查点的数量只能是一啊,不能有并行的,另外呢,当前的模式必须是exactly one啊,因为我们知道在这个过程当中,它还需要去保存当前缓冲区的很多数据啊,那所以不能有并行的检查点保存操作啊,这就是关于检查点的一些配置,这个我们可以在用到的时候去尝试的,调用checkpoint config这个类里面的方法就可以了。
我来说两句