00:00
了解了算子状态的基本概念,接下来我们就可以看一看在代码当中到底应该怎么使用算子状态。我们已经知道了,对于flink而言,状态其实本质上来讲就是一个本地变量,就是当前算子并行子任务实例上一个比较特殊的本地变量。那它的特殊点主要就在于是由flink来帮我们统一管理起来的,那为什么需要flink进行一的管理呢?之前我们介绍过,首先最重要的就是我们都是内存计算,那假如发生故障的话,当前所有的计算结果,所有的状态就会丢掉,那怎么能保证我们有良好的容错性,发生故障之后还能马上恢复从之前的计算结果继续去做进一步的计算呢?那具体的考量当然就是要把之前的状态。
01:01
随时的做一个持久化的保存,这就涉及到了所有的状态数据,需要去进行序列化,反序列化啊,就是既然做了持久化保存之后发生故障,当然还要能够反序列化,把它读取出来,恢复到内存里面继续计算整个这套机制,这是需要弗link帮我们完整的去做处理的。另外还需要考虑的一个是我们很多操作都是。K败之后去做的聚合,去做的开窗,那对于这些操作而言,状态其实是针对某一个K单独生效,而并不是针对当前我们每一个并行子任务,所以需要对于数据而言,要按照K把状态进行隔离进行划分,啊,那这个需求,这就是之前我们所说的kid sit的最主要的特点,它其实就是针对做了一个隔离。这些通通都是弗link底层它的统一的状态管理机制帮我们实现。
02:04
而对于算子状态而言呢,它就比较简单一些。不需要考虑K的影响,跟K没关系,所以我们主要就是只要能知道当前保存的状态是什么,它是什么样的类型,然后能够做一个。持久化的存储,然后发生故障的时候,能把它反序列化读取出来,恢复出来就可以了。但是这个过程当中还是会有另外一个问题,那就是我们会发现发生故障,重启应用之后。并行度是有可能发生变化,有可能并行度做了缩放,增大和减小都有可能,那在这种情况下状态会发生重组,那我们数据同一个数据就有可能会被发往不同的分区了。对于Kate而言,其实不存在这个问题,因为我们的重组调整的策略和整个这个K,它进行K基于哈的分区的策略应该是一致的,只要都是基于K去做这样的调整,那就没有任何的问题。
03:13
但是在算子状态里边没有K这一说,那所以接下来我们是轮询发送,那对于当前的每一个算子定型子任务而言,其实是不能确定还能够收到跟之前同样的数据的,如果后续我们有一些操作是跟当前数据真的有关联,跟数据的某一个特征真的有关联的话。必须让他应该要发送到,就是相同的数据应该要发送到同一个分区的话,如果有这样要求的话,那我们其实就处理不了。所以对于算子状态而言,它不是由flink统一在底层帮我们全部包装起来,全部搞定的,而是。暴露了更加底层的接口,让我们自定义去实现它。怎么样去。
04:06
持久化保存当前的状态,然后发生故障的时候,怎么样从持久化的所谓的检查点里边再把这个状态恢复出来。啊,那所以这里面我们关键要实现一个接口,就叫做checkpoint的方式,这个所谓的checkpoint检查点,其实就是我们可以认为flink里边所有任务状态进行持久化保存之后的一个快照。这就是一个检查点啊,那所以我们如果要想要自定义当前状态的一些操作的话,就必须要实现这样一个接口。我们可以看到这个接口在源码里面的定义呢,主要就是有两个抽象方法,一个叫做snapshot state,另外一个叫做initial state,从字面上其实也非常的容易理解,一个就是保存当前的状态到一个检查点里面。
05:04
就是当前我应该要这个方法的时候,就是要去保存一个检查点,持久化保存的时候。而另外还有一个是initial state,也就是初始化状态的时候。自然想到了,那之前我们对于k set都是在open生命周期里边去获取状态,控制距离,那是因为我们并不需要去考虑状态本身的,初始化的底层帮我们都搞定了,我们只要确定当前这个类的实力已经创建出来,能够获取到运行上下文,就可以去调用它。而现在呢,现在我们就要自己去初始化状态,这个状态那就应该是。有可能当前真的是初始,初始的话,那就直接把状状态创建出来就可以了,那另外也有可能是发生故障,恢复之后的情况也会调用到这个方法,那应该要怎么样呢?那就需要判断一下,然后从checkpoint里边拿出对应的数据,然后把它恢复成我们想要的状态。
06:13
就是这两个方法所做的事情。呃,那在这个过程当中,我们需要注意的是这两个方法呢,都有一个contact参数,一个上下文,但是这两个上下文其实是不同的。我们可以看到,在snapshot state这个方法里边,它所谓的上下文是一个function snapshot contact,而下边的initialize初始化的时候呢,拿到的是一个。Function initialization。所以对于snapshot state这个方法而言,这里的上下文啊,其实是进行检查点快照时候的上下文。本质上来讲,这跟我们之前的运行是上下文是两回事。
07:01
它并不能直接当做之前我们介绍的运行式上下文来使用。可以提供当前检查点的相关信息,但是没有办法获取到我们状态的控制距离。而下面的这一个function initialization context,它本身拿到的就是函数,函数类进行初始化时候的上下文啊,那这就是我们真正的运行时上下。在这里边就可以获取到当前的状态,那这个context这个上下文呢,可以调用两个方法,一个叫做。Get oper operator state store这样的一个方法啊,那另外呢,还可以去get state store,也就是说可以去获取当前算子状态的存储,也可以获取按键分区状态的存储,那所以在这里边每一个store,如果我们获取到之后,就可以进一步的获取我们定义出来的状态。
08:08
那这里面。算子状态的注册和使用的过程其实跟k set也是非常的类似的,也需要去定义对应的一个描述器,里边传入的当然也就是当前的名称和类型了,我们接下来可以在具体的代码里边去做一个判断啊,那这里边可以看到当前的checkpoint function这个接口其实是一个非常的接口,那它的这种使用其实是为有状态的流提供了非常的方法。它跟前面我们介绍过的函数类里边定义k states的这种方式其实并不矛盾,就是如果说我们是从这里的。Context直接去get key state store的话从。按键分区状态存储里边获取对应状态的话,拿出来其实就是一个K,这跟负函数类里边获取到的是一模一样。
09:08
这就是关于。算子状态的代码当中的基本使用,那接下来我们就在具体的事例当中来看一看它的应用。啊,这个例子呢,其实也是比较经典,因为我们说算子状态其实应用并没有k state应用那么广泛,它一般都用在根本就跟key无关的场景,那一般就是S和S任务了,所以接下来我们介绍的就是使用算子状态去做数据缓存输出的这样的一个应用。诶,那这个过程主要是考虑到我们很多场景下。得到的最终结果,计算的结果,其实要做输出的话,是要输出到外部系统的,可能是一些数据库,也有可能是文件,那在做外部系统输出的时候,这个操作可能是比较耗费时间的,因为要跟外部系统去进行连接,需要进行网络传输啊,可能需要进行序列化各种各样的操作,这个过程如果说我们是每来一个计算结果就去输出一次的话,很显然它的代价会比较高,所以我们就可以考虑,那可以做一个缓存嘛,缓存一段数据之后,一部分数据之后,然后。
10:29
按照一定的标准啊,然后统一发送到下游就可以了,所以接下来我们要实现的就是这样的一段代码。接下来,我们可以在代码当中新建一个class。这个我们就叫做缓存输出buffer。Example。那么当前的主方法其实跟前面的测试非常的类似,我们也还是借用click里边自动生成的event数据,然后来做一个具体的输出测试啊,那这个过程我们就直接使用。
11:08
前面已经。非常熟悉的这段代码。直接copy过来就可以了。上面我们还是throw exception。下边。Env execute执行起来。整个的框架是一样的,然后接下来我们其实主要是要做一个批量缓存输出了。批量缓存输出,那当然就是什么操作都不要做了,要测的就是输出,所以就直接ADD think好了。这里我们当然就要做一个自定义的方式了。Buffy s。这里面我们可以传一个参数,因为缓缓存输出的话,那相当于这个批量到底是多少,达到多少的时候我们就直接输出,这应该是有一个标准的,比方说我们攒够十条数据就做一个输出,这里面可以传一个参数,十。
我来说两句