00:00
我们已经了解了算子状态的基本概念和类型,那接下来呢,就可以看一看在代码当中到底怎么样来使用算子状态来实现一个具体的应用案例了。那关于代码中的使用呢,前面我们也提了,算子状态可以看成就是一个本地变量啊,其实差不多,那关键呢,就是它还要做一个持久化的保存,还要满足我们的容错性要求,那所以呢,关键就在于需要去实现一个checkpoint方式接口。那关于这样一个接口呢,我们也可以看一下啊,它到底是个什么东西,Checkpoint的方式,这个接口里边最重要的就是实现这两个抽象方法,那一个叫做。Snapshot state,另外一个叫做initialize state,那所以从名字上我们就可以看得出来啊,一个就是要对当前的状态做一个快照,就是要把状态持久化保存起来啊,那其实对于这个状态的持久化保存啊,这个机制就叫做检查点,就叫做check。
01:07
这个后面我会展开去讲解,那另外还有一个呢,Initialize state,这说的是状态的初始化,那什么时候会做状态的初始化呢?当然就是我们启动一个应用的时候,诶当前这个状态就会初始化一下,那我们会知道,那初始化的时候状态不都是就是默值嘛,就像我们前面那个k state啊,如果是一个长整型的value state的话,那默认就是零嘛,那如果要是event类型的value state的话,当然默认就是空,就是闹,哎,那我们这里边如果说是第一次运行的话,哎,那其实就是初始化一个默认值,那如果我们现在不是第一次运行的话啊,那就是我们所说的啊,发生故障之后重启的时候。这个时候就会涉及到需要从检查点里边去读取之前已经保存好的状态了啊,那怎么样从检查点里边去读取状态,再把它恢复到当前的本地变量里边,这就是我们要解决的一个问题,所以这两个方法啊,它调用的时间ize state,这是发生故障之后重启或者是应用第一次运行的时候去调用这个方法,它主要的作用就是从状态里边把我们想要的数据恢复出来,诶,那这个状态到底怎么样去获取呢?我们看到这里边有一个function initialization contact,哎,我们可以点进去看一下啊,这是一个接口,它继承自manage initialization contact,我们可以看到它里边有restore的方法,这就是判断是否是重启的应用,也就是说如果发生故障的话,这个返回就是true。
02:51
啊,那如果说是第一次运行的话,这个返回就是false,这可以判断就是当前是否是从检查点里边去重新读取状态的。
03:00
那另外还有两个方法,一个叫get operator state store,另外一个叫get key state store,哎,所以我们看到这就是获取当前的算子状态,它的那个存储空间,还有就是获取k state按键分区状态的对应的存储空间,啊,那利用这样一个上下文,我们就可以获取到当前所有的算子状态和按键分区状态。啊,从这一点上我们也可以看得出来啊,对于checkpoint function这个接口而言,它其实是一个非常底层的接口,因为它是可以获取到所有的状态,其实是无所谓operator state还是k state的,它其实都可以获取得到啊,就是像这个k state,我们前面是没有调用到这样的最底层的接口而已,本质上我们知道flink框架肯定底层帮我们实现的也是一样的东西啊。啊,那这是关于initialize state这个方法,那snapshot state这个方法什么时候调用呢?那当然就是要去保存状态,把它保存快照放到checkpoint里边的时候,就去调用这样的一个方法。
04:10
就相当于,哎,我们现在隔一段时间就要做一个存盘了,那就调这个方法,那这个方法里边我们要干什么事儿呢?那很显然就是要把我们当前的一些数据要写入到状态里面,然后flink底层就会自动的把算子状态写到checkpoint,那这里也有一个上下文,点进去我们可以看到,诶,它也是继承了manage snapshot context,那这里边的方法呢,就只能获取get checkpoint ID,就是获取当前检查点的ID,另外还有get checkpoint time sta检查点的时间戳,它就不能去获取当前的状态,只能获取到检查点的相关信息。啊,那所以接下来我们还是举一个具体的例子,看一看到底怎么样去实现这样一个算子状态的应用吧,那这个例子呢,呃,因为前面我们说了啊,一般在业务逻辑里边,我们使用的都是k state,都是按照某个键啊去进行划分,去创建对应的状态实例的,那什么时候会用到这个算子状态呢?那就是输入输出的时候用的比较多,所以接下来呢,哎,我们就使用算子状态来进行一个数据缓存,然后再think function,我们自定义一个think function。
05:26
缓存一段数据之后,然后统一发送到外部啊,那当然这里所说的下游其实就是外部的存储系统啊,我们如果要做测试的话,直接在控制台打印也是一样的,那对于这个需求而言,最简单的方式当然就是要输出的数据我们直接放在一个列表里面啊,先做一个缓存嘛,哎,那如果说想要去发送的时候,按照我们的个数,或者说按照我们的时间,缓存已经满了,已经需要去触发发送操作了,那这个时候我们就统一做一个输出啊,所以接下来我们可以在代码里边具体去测试一下。
06:05
接下来我们还是直接去创建一个scla的object啊,那现在我们是要做输出的缓冲啊,我们就叫做buffering s。Example。啊,那方法先写出来,整个处理流程还是类似stream execution environment get,这个我们叫做env。啊,那上面还是把下划线先引入。然后全局的并行度设成一,方便我们测试输出,然后接下来呢,直接ADD source new,一个click source,我们自定义的测试数据源,然后去。提取时间戳。生成水位线。啊,那读取数据源之后,接下来我们也不用做其他的转换操作了,直接I think。把它做一个输出就完了,当然了,这里面我们需要去做一个自定义的实现,我们把它叫做buffer s。
07:02
这里我们可以指定一个参数,那就是到底缓冲几个数就做一个输出啊,类似于一个技术窗口一样的感觉啊,那所以这里边我们可以给一个比方说十个数就输出一次,好啊,那at think之后我们当然就不需要再去控制台打印了,哎,那所以这里边就env执行起来,这就是整个处理流程,我们已经把这个框架写出来了,接下来就是实现自定义的。S function。Class buffer s,那这里的这个I啊,传进来的这个参数属性其实就是一个阈值嘛,整个缓冲的阈值,所以我们可以叫做thhol,把它改成这个名字,呃,那么当前我们要实现的是一个think方式。这里需要有当前输出的数据类型,我们现在当然就是了。另外,我们还需要去实现一个checkpoint的function接口。
08:05
啊,那所以我们需要with checkpoint的方式,然后接下来我们看一下必须要实现的方法,哎,这是checkpoint的方式所要求的,一个叫snapshot state,另外一个叫initial state。这主要就是要让我们去实现想要去保存状态的时候,诶,那把我们当前这个任务实例里边的什么信息要存放到状态里,那如果是initialize data的时候呢?那就是从检查点里边获取出来的状态到底要恢复给哪些数据,好在我们当前这个任务实例里边又要恢复哪些信息?所以这里如果我们想要对要输出的数据做一个缓冲的话,那很显然应该是要设置一个列表状态,一个list state。我们定义。列表。状态。保存要缓冲的数据,哎,所以这里边我们可以比方说啊,直接用外来定义一个变量啊,那我们就把它叫做。
09:11
Buffer的state。它的类型当然就是一个list state。我们可以看到当前在定义算子状态的时候,我们用到的list state跟之前t state里边的list state是完全一样的啊,他们其实就是一回事啊,那里边需要有一个泛型,我们当前保存的当然就是even的类型了,直接这样的一个数据,然后初始默认的啊,给一个空的下划线,然后接下来呢,为了方便我们后边在代码中更容易的去处理这个列表里面的数据啊哎,我们可以定义一个本地变量,一个list啊,这样的话就可以把list state里边的数据啊全部转换放到这个,比方说一个list buffer里边,这样的话对应的一个本地变量,本地变量如果想要。进行快照保存的时候,我们再把list buffer本地变量里边的数据写到对应的list state就可以了,哎,那如果恢复的时候呢?当然如果说是发生故障之后恢复这个buffer state,从检查点里边能读出来数据的话,也要把它恢复到对应的这个本地变量里面,哎,这样的话就实现了一个算子状态和本地变量的一个对应关系,正常情况下,我们每个数据来的时候,只要操作本地变量的这个缓冲就可以了。
10:30
哎,那所以这里边我们还可以。定义一个。本地。变量。列表哎,主要就是用来做我们本地的这些快速的操作,所以这里我们可以直接定义一个VL,这个我们不需要是VR啊。我们可以把它叫做buffer的list啊,那它的类型的话,其实就是一个list buffer了,我们可以直接使用list buffer的诞生对象把它创建出来啊,那里边给一个泛型是英文的类型。
11:05
声明好了这些,然后接下来我们就得考虑了,这里的算子状态,我们只是把它定义了这样一个变量,那它真正意义上的获取状态,我们知道在之前k state里边,那是要get time contact,然后调用get list的方法才能够把这个算子状态获取到,那现在呢?诶,现在我们好像没有open生命周期,那怎么办呢?这里有initialize state,这就是一个初始化的方法嘛,所以在这个里边同样可以去获取当前的算子状态的控制距离,那这个获取的方法就是。我们现在不能直接去get runtime context了,没有这样的一个方法调用了,不是负函数类,哎,那所以怎么办呢?那就只能使用当前的context,当前的上下文,那这个上下文里边,前面我们看到有get operator state store这个方法可以获取当前所有的算子状态,当然了,如果我们想获取k state也能获取到啊,我们现在用的是operator state。
12:09
然后诶,这里面就有对应的,我们看到get list state get union list state get broadcast state等等等等,我们现在是get list state里边同样要传的就是一个list state script,一个描述器。哎,那所以我们把这个描述器要写进来,里边的参数当然还是一个名称,比方说我们现在这个就叫buffer的list,另外还有一个就是泛型类型,那是event,哎,这样的话我们就获取到当前状态的控制句柄了,就可以用它了,哎,那另外因为我们把这个状态跟本地的这个列表完全关联在一起,那如果是这样的话,我们接下来还得判断。就是说如果我们当前是第一次运行这个应用的话,Initialize哎,那初始化这个没什么,直接把它拿出来,然后我们直接使用那个默认值初始值就可以了,那如果要是故障恢复呢,从故障当中恢复出来的时候,这里的状态值那是要从。
13:12
检查点里边去进行读取的,那读取出来之后,接下来我们接着去处理的过程呢,这个八分的例子就不能是初始的空值了啊,那就一定要从buffer state里边把对应的数据还要恢复到这个list里边才行,哎,所以这个本地变量跟buffer state一定是一一对应的关系,所以这里边我们得判断一下啊,判断。如果是从。故障中恢复。哎,那么。就将状态中的数据添加到局部变量中。本地变量中,哎,那所以我们现在要做的判断,那怎么样才能知道它是从故障中恢复呢?哎,那这个之前我们说contact里边有一个方法吧,叫做easy restore,如果它为true的话,就说明当前是从故障当中恢复的,哎,那所以接下来我们就是一个for循环,把状态变量当中的所有数据。
14:16
每一个data或者是一个element,直接把它全部恢复到list buffer里面,哎,那我们当前的这个叫做buffer state,我们调一个点get方法,然后后边,哎,那这里我们看到这还报错,需要去引入一个影视转换的类,哎,那是SC拉点collection点。convert.implicit conversions把这个引入,然后接下来,哎,那就是buffer的list。把当前的data添加进来,这就是我们完整的这个初始化的过程。啊,那有了初始化的过程,那另外一个snaphot state做快照的这个过程又是什么样呢?其实我们想到这个过程应该就跟初始化刚好反过来,就是初始化的时候是从检查点里边读取状态,然后如果是故障恢复的话,要把状态恢复到本地的列表里,那如果要是做快照呢,那显然就是把列表里面的数据。
15:17
保存到状态不就完了吗?哎,那所以因为我们在每个数据到来的时候啊,默认都是操作这个本地变量的啊,这个会更快一点,所以我们这里边就是snapshot的时候,就是要遍历当前的列表for。每一个列表中的数据。Buffer的list。然后里边要做的事情就是全部添加到buffer的state里面,去掉它的点at方法。所以这个过程可以认为是跟这里完全对称。然后接下来我们还必须去实现的一个方法,那就是。Invoke方法,这就是我们说的每一条数据到来的时候,就要调用到这里的invoke方法来,来表示我们到底应该做什么样的处理流程,我们这里做的操作非常简单了,就是缓冲数据嘛。
16:13
缓冲数据我们用到的其实就是buffer list,每来一个数据就直接把它添加到buffer list里面,那这里是把value直接加进来就可以了。然后呢,哎,另外我们还得判断一下。判断是否达到了阈值。就是我们设定的缓冲的上限,如果达到的话,那就直接要做一个向外部系统的写入操作,所以这里面我们再判断一下if。那只要判断buffer the list的size,它的大小是否已经等于我们传进来的thhold就可以了,那如果说已经要等于的话,诶,那这里边我们就把buff分list里边所有的数据提取出来,写入到外部系统,所以这里输出到外部系统。
17:05
这里我们就用。打印到。控制台模拟一下就可以了,这里我们没有对应的外部系统啊,啊,所以这里呢,我们就直接buffer list去调一个for each啊,那这里呢,就是每一个数据。直接做一个打印输出。啊,那当然了,如果为了方便我们看的更清楚一点的话,每一次输出完毕,我们可以加这样的一个分割线,里边我们可以写上当前一次输出完毕。啊,这样的话就用控制台的打印输出模拟了一次,像外部系统的写入。另外输出完了之后,我们还需要注意,就是接下来我们还需要每一个数据到来之后调用这里的一个方法,所以接下来如果我们继续缓冲数据的话,那其实是在之前的buffer list的基础上继续做了缓冲,哎,那显然我们一旦要输出之后啊,接下来我们就应该把缓冲要清空了,所以接下来我们要做的是清空。
18:10
缓冲啊,那所以buffer list就直接可以调一个K2方法把它清除掉,接下来就又可以重新开始去缓冲数据了,还是到十个之后,那就输出一次,接下来继续清空。这就是关于我们整个的处理流程啊,当然了,如果说我们考虑到这一个需要清空buffer list的话,那另外还有一个就是snapshot state这个方法里边,我们是把buffer list里边的所有数据直接追加到了buffer state里边。那假如说之前我们buffer state里面就有数据呢,因为buffer state我们是从来没有做清空操作的,诶那这个时候的话就相当于又是一个追加操作,我们其实想要的不是这样,我们只是把当前还没有输出到外部系统的数据做一个缓存,然后。
19:01
写入到当前的这个状态对应的checkpoint里边就可以了,所以这个时候我们在做快照的时候呢,也先要做一个操作,那就是清空当前的列表状态,算子状态。清空状态,哎,那直接把buff分的state也要调一个clear方法啊,这样的话接下来我们就完整的啊,先清空,然后再把buffer list里边的数据全部放进去,就可以保证buffer list和buffer state永远都是一样的状态啊,所以我们看到这里边的核心逻辑啊,我们的in work方法里边就没有涉及到buffer state的使用,我们都是用一个本地变量来做快速的本地操作的,哎,那只有在想要保存状态,想要做快照的时候,才把这个数据拿出来放到了buffer state里面。那对应的啊,如果要想要从检查点里边发生故障去恢复状态的时候呢,也要把状态里边的所有数据拿出来放到本地变量buffer list,这就是我们对于算子状态的一个应用实例,接下来我们可以运行一下,看一看效果怎么样。
20:13
现在我们还是每十个数据才会输出一个结果。所以我们可以看一下。应该是要十秒钟的时间啊,每秒钟我们产生一个数据,诶,所以我们看到等十秒钟之后,可以看到有十条数据一次性的输出,输出完毕啊,这就有点像我们做了一个批量的缓存,然后一次性输出,这就是所谓的算子状态的使用。
我来说两句