00:00
我们讲了这个检查点算法,那我们下来看一看,这个在实际应用当中。关于。这个checkpoint到底有哪些需要注意的地方,就怎么去用这个checkpoint,呃,一个大家已经知道了,就是我们知道在呃程序里边如果要开启这个检查点的话,那需要调这个对吧?Env.enable checkpoint这个是开启,那大家看到这里边传一个值,这个指的是对大家看到这是隔多少秒,就是所谓的一个时间间隔,隔多长时间去触发一次checkpoint的操作啊,那这里其实大家看到除了这个之外啊,后面其实还可以跟一个东西啊,这里大家可能看不清楚,我们直接点到这里来看啊。大家看还可以传一个checkpointing mode啊,这里还可以传这个东西,诶,那这个东西又是个什么呢?诶,这里边大家又看到这里可以传不同的mode,可以是什么mode呢?可以是exactly one啊,我们说的这个精确一次对不对?除了这个之外还能传什么呢?对,还能传at least once至少一次,诶,那这又分别都是什么呢?哎,这是代表了不同的。
01:21
对状态一致性的语义,状态一致性的级别啊,具体我们下节课就是下一部分再给大家详细讲啊,大家看到这里边可以设这些东西,呃,那除了在这个直接开启checkpoint的这个语句里边调用的时候,直接把这个设定,除这个之外还可以怎么样呢?还可以在这个EV里边我先去。大家看可以拿到这个checkpoint config,然后再去做set,大家看这里边set是不是就可以set checkpoint interval啊,所以就是上边大家看本身这个其实不传也可以对吧?呃,就是如果不传的话,下边我还可以单独再设,然后另外我是不是在这里也可以去set这个checkpoint mode啊,对吧?所以在这里我可以写check。
02:15
Pointing。Mode点,呃,比方说这里边,呃,大家会看到,就是默认应该是什么呢?它默认的状态一致性的这个级别是exactly once,如果大家想改的话,也可以给它改成at least once,至少一次,对吧。然后另外大家看一下还可以设什么,同样还是在这一个get checkpoint conflict下边,然后再去做set,大家看还可以set什么?对,还可以设置超时时间timeout,这个timeout又代表什么呢?啊,这个timeout其实很简单啊,就是说我们做那个checkpoint的时候,大家会想到既然是保存到状态后端嘛,有可能哎,这个涉及到IO操作,或者说网络传输,可能耗时会比较长,所以这里边我们可以设置一个超时时间,超过这个时间就表示哎就不做了,对吧,当前这个checkpoint就直接丢弃了啊这样的一个状态,比方说我们大家看它又是一个浪类型啊,比方说我们设置一个这个10万秒啊,10万秒这个大概是啊,不,不是10万毫秒啊,大概是100秒,100秒的样子,对吧?啊,这个大家可以按照自己的实际生产。
03:37
当中的需求去做设置,另外我们继续看还可以设置什么。设置feel on checkpointing errors,这是什么意思啊,字面上理解的话就是就是失败的时候,诶是诶,就是checkpoint出现错误的时候失败对吧?
04:02
哎,所以它其实要设置的是什么呢?其实是如果检查点出现异常的时候,报错的时候,我是不是要整个把我们当前的这个drop给fail掉。啊,那大家会想到为什么它会有这么一个配置呢?啊,因为有时候大家可能会觉得checkpoint发生异常,其实并不代表我整个这个作业和任务出现问题了,对吧?啊,这只是有可能我在保存的时候,有可能这个外部系统啊,或者是什么的出了一点故障,或者是超时啊什么的,那有可能我觉得它并不影响我正常的运转,所以我是不是可以不让它停止我把我整个这个任务给fail掉啊,啊所以这里边其实可以配置这样的一个级别的,当然这里边默认它其实是这里边它传的就是一个布尔对吧?呃,默认传的是触,呃,就是正常情况下它是要如果checkpointpoint失败就算是fail了,那当然大家如果要是觉得我不想把它fail的话,可以传一个false对吧。
05:05
呃,继续大家看还有什么,这就一个一个给大家说一下啊,这里边max concurrent checkpoints,字面上理解这是什么?最大当前最大同时进行的checkpoint的数量啊,它其实就是代表什么呢?代表啊,我们可以同时指定,就是同时运行这几个checkpoint,诶大家想为什么会出现这种情况呢?为什么会出现同时进行几个沉矿的情况呢?哦,就是你那个间隔设的太小,它一个又又要花费很长时间,对吧?啊,所以这里边是不是就有可能有可能你一个没完一个又起来了,对吧?啊,这种情况有可能就同时会有好几个,所以如果同时有好几个checkpoint在做操作的话,大家会想到这是不是对系统性能可能会有一些影响啊,对吧,你在不停的做checkpoint都都玩不了,所以这里边可能要设置一个最大值啊,就是当然这里边默认是一啊,大家也可以设别的。
06:14
啊,另外大家看还可以设最小的。Mean pause between checkpoint。这是什么呢?这个其实这个其实指的就是说两次checkpoint之间的最小时间间隔对吧?啊,所以大家看这一个和上面这个是不是稍微有点冲突啊,大家想啊,确实是有点儿冲突,就是如果我这里边设置了两次checkpoint之间的最小时间间隔的话,那上边设置个最大的并发的并并行的checkpoint设计都没用。就相当于我最多你你都两者之间必须得有间隔了,那是不是最多同时只能有一个啊对,所以这两个大家可以就是知道他们俩其实是会有会有冲突的这个状态的啊,然后还有这个,大家看这个其实checkpoint的配置项很多啊呃,我们继续大家会看到,当然你可以去get一些东西啊,我们这里边主要是做配置set,还可以set什么呢。
07:24
到大家看还有一个叫enable externalized的checkpoint,字面上理解这是什么?开启一个外部的checkpoint对吧?所以它其实是开启checkpoint的外部持久化,诶,那可能我们会有疑问,说你这个拆point不本来就是做了一个持久化吗?外部的持久化吗?呃,这里大家要注意啊,就是本身这个checkpoint啊,它在持久化到外部之后,假如说drop如果如果挂掉的时后,它其实是会被自动清理掉的,就如果drop直接fail掉之后。
08:08
Checkpoint其实是会外部的那个checkpoint是会被清理的,如果最终判定这个drop失败的话,它会被清理的,所以这里边如果我们开启了这个外部的持久化的话,就能表示甲,即使是。Drop失败,那么它也不会自动清理掉拆point的,对吧?啊,就是如果我们把这个开启的话,它也必须要手动去清理state才能够把这个东西是。呃,才能够把把这个东西清掉,那这里面大家看到还需要传参数啊,这里边传参数传的是一个,大家大家看externalized checkpoint cleanup传的是一个这么个东西。那么大家看这里边有什么选项呢?有两个选项,一个叫做delete on cancellation,一个叫retain on cancellation。
09:11
啊,所以这里定义它可以选择的是什么呢?可以选择的是当drop被手动取消的时候,我们还要不要保存它的这些外部的checkpoint。呃,所以就是说假如说是delete on cancellation的话,也就是说如果我是手动取消了这个job,那就不要存了,对吧,你直接就清掉就好了,那如果这里边是ta on cancellation的话,那就相当于是即使我手动取消也给我留着啊,所有的东西都存下了,所以大家看这就有不同的这些配置项啊,啊就是可以大家在具体在做这个应用的配置的过程当中,大家可以把这些东西做一些调整啊,跟我们实际的应用场景会相关。呃,除了这些之外呢,大家其实看到在文档里边。
10:03
我们后边在这个选择状态后端的时候,这部分代码其实还有一个操作啊,这里边我们就是选择了一个状态后端,这是file system状态后端,然后呢,我们开启了这个checkpointing啊,没做其他的那些配置,但基本上开启了之后就可以正常去去做检查脸了,对吧,可以有这个状态恢复了,但是还差一部分,还差什么呢。还需要去配置一个重启策略,诶,所以大家看我们还可以去在env里边。做一个操作叫set啊,大家看之前我们做过这个set,呃,Stay back对吧,Set并行度,Set时间语义,我们这里边可以去set restart strategy,重启策略,这里边可以传什么东西呢?呃,当然就是说这个重启策略可以有两种方式啊,我们可以在这个yamo文件里边去配置当前的这个restart strategy,配配这个字段可以有主要的可以配的这个重启策略方式有哪几种呢?呃,最常见的一种叫做fixed restart,就是大家看到在这这里边啊,Fixed delay restart这样一个东西,顾名思义这个是叫什么呀,固定延迟重启对吧?啊,所以我们这里边把这个写出来啊,Restart strategy。
11:40
大家看这里边我们可以主要啊,经常可以用到的啊,就一个是fixed delay restart,它这里边的参数需要传什么呢。首先需要传一个int类型,这个表示的是尝试,对要尝试的次数,重启要尝试的次数,你到底尝试几次,然后第二个一般要配置的一个参数,大家看还有一个time类型对不对?
12:11
大家可以直接到这里边来看一下啊啊啊,这里面后面是一个浪啊,这个浪表示就是两次尝试之间的那个延迟,对不对啊,所以在这里边如果已经配好的话,比方说我们配500毫秒,那其实代表的就是我在费用之后出现故障之后,去对做三次最大三次的尝试重启,每次重启之间间隔500毫秒。每隔500毫秒重启一次,对吧?啊,这就是这样的一个操作啊,那当然这里边大家刚才看到除了这个fixed delay restart之外,还有一个,呃,比较常见的叫什么叫failure rate restart,这又是什么意思呢?对,这个叫失败率重启,失败率重启又是一个什么呢?大家看它要传三个参数,一个是failure rate,就是失败率对吧?啊,它其实代表的就是指的是在每个时间间隔的,因为它是个int嘛,所以还是个整数,它代表的是在后边定义的那个。
13:19
失败失败率测量的时间间隔内最大,哎,就是能尝试几次,它是这样的一个一个意思,然后后边这个failure inter呢啊,那就是就是本身测量这个失败率本身你是多长时间内去测量对吧,就是在多长的范围内去做尝试。那至于这个delay internal呢?那就是你这里边要要尝试的这几次重启,每一次之间间隔多久对吧?啊,这是这样的一个定义,所以大家看这里边如果我们要定义的话,它后面是一个time类型了啊time类型比方说这里边我们这个它在。
14:03
比方说是50秒之内三次重启对吧?啊,50秒可能少了一点啊,这个这个就需要。比方说我们直接给一个300秒吧,五分钟之内重启三次,每次重启的时间间隔是。可以自己去定义,叭方说十秒钟的时间间隔对吧?啊,当然这里边这个说明这个time我们给错了啊,大家看这个类型不匹配对不对,这里边我们需要的是这个flink API common time.time。就是可能得得这么去用对不对。大家看这样就没有问题了,所以这是这个重启策略的一个配置啊,所以把这些东西都配完,我们整个这个容错机制和故障恢复的这个保证就都有了,这就是我们这一部分内容。
我来说两句