00:00
接下来就是自定义实现。Think function。那我们自然想到直接可以。我们可以想到直接public static plus。Buffering s。需要去implement一个think。我们可以加上当前的数据类型event,我们直接把这个event类型的数据直接输出就可以了。但是这里边会有另外的一个问题,就是如果说我们只是把它来了一段数据,我就把它输出来了一段就输出的话,呃,那在发生故障的时候,那就有可能出现我们之前已经来了很多数据,已经在这儿缓存了一批了。但是还没达到十个,那接下来如果要继续在之前的基础上继续执行的话,那相当于这一部分已经缓存的数据就彻底丢掉了,他没有写入外部系统,而且也没有做任何的保存,那当然就完全丢掉了呀。
01:14
为了保证我们的数据不丢,那自然就想到了我应该把它要做一个持久化的保存,这个持久化的保存就是所谓的检查点checkpoint,所以当前我们可以考虑到怎么样能够把它写入到检查点里面呢?弗Li给我们提供的接口就是check方式,所以这里面我们需要去实现接口,除了方式之外,还应该有一个check方式。那接下来我们会看到啊,必须要实现的方法有snapshot state和initial state这两个,这是ED function要求的,那另外我们这里边对于这个think function而言。
02:00
它本身有一个可以去实现的方法,就是invoke了,每来一条数据之后,要去做什么操作,都是在这里面去定义出来的。那对于这个例子而言,我们可以首先。先定义一个。定义。当前。类的属性。类的属性啊,那主要就是我们所谓的这个批量了啊。我们把它定义成一个浴池吧。Private。反了。一个int就可以叫做threshold。这是我们定义的一个阈值。然后接下来把这个阈值已经定义出来之后啊,那那当然了,必须得有。对应的constructor,先把这个写出来。然后我们这里直接传一个十进来,这样的话就没有任何问题了,然后接下来如果说我们想要把已经缓存出来的数据要。
03:03
写入持久化,写入到checkpoint的话,那我们自然就想到了,必须得有媒介,必须要把它作为算子状态保存起来,然后才能够把它写入到checkpoint里面,所以这里面我们定义的主要啊,首先应该是要有一个。有一个list。我们把它定义成private类型。就把它叫做buffer的elements。我们缓存起来的所有数据。因为我们知道当前其实就是要这样的一个本地变量,然后把已经到来的所有数据每来一个,我就放在这个例子里边缓存起来就可以了,那这个数据怎么样能保证不丢呢?诶,那就是还需要得用另外的一个。List state,一个列表状态,这是一个算子状态,把它把当前的这个buff elements啊,直接写入到这个算子状态里边。
04:01
然后这个算子状态fli底层就会就相当于啊,就可以让它直接持久化保存在point里面。如果发生故障的话,就可以从里边再恢复出来,当然具体恢复的这个逻辑还是需要我们自己去定义的,那这里面接下来我们需要去定义一个。算子状态。当然是一个。这个也可以定义成private。List state。Event。这里我们直接把它叫做checkpoint。State。跟前面buffer的elements就有所区别,但是我们知道它俩其实应该保存的是同样的东西,只不过当前的这个是一个本地变量,我们是每来一个数据event就应该直接缓存到这个列表里面去,而当前的这个Lisa state呢,那是我们想要去做对应的checkpoint的时候,才应该把buffer的element里边的数据塞到list。
05:14
这是我们基本的一个处理思路,然后接下来呢,就是呃,这里边在。构造方法里面啊,除了这个hold之外,我们应该把当前的这个buffer elements也做一个初始化啊。我们这点。Buffer elements。应该要一个release出来。放在这儿空的一个列表,然后后边invoook里边,那就可以直接去添加了。每来一个数据,直接添加到当前的列表里进行缓存。缓存到列表。啊,那这里面我们需要做一个判断,就是如果说当前这个缓存的列表已经满了的话,达到这个阈值的话,当然我们现在就应该输出到外部系统批量写入吗。
06:08
所以。判断。如果。达到阈值。就批量写入。所以这里面我们要判断的就是buffer element,它的size如果已经达到,如果已经等于thhold的话,这个时候。就要做一些操作了。我们要做的是把它直接批量写入,这个批量写入我们就直接用一个。控制台的,来做一个模拟吧。用。打印到控制台。模拟写入外部系统。我们这里就用一个for循环,便利每一个元素。Element。从当前的buffer的elements里边把它拿出来,然后直接做一个打印。
07:08
那当然了,打印完了之后,我们可以再加一个。加一个分隔符,然后表示当前的输出已经完毕了。输出完毕。那输出完了之后,当前的这一个缓存空间这个列表就可以清掉了,我们现在没有用了啊,所以这里边直接。Bufferd elements。我们可以直接把这个列表做一个清空VR,这就是我们每来一个数据之后要做的操作。那如果说我们就不用考虑发生故障之后的恢复的话,那其实这个逻辑已经可以实现我们的需求了,批量写入啊,那之前来的所有数据缓存在这里,缓存在一个本地变量,一个list里面。但是如果发生故障的话,我们知道里边已有的数据会丢,那怎么办呢?那就要把这个bufferd elements和checkpointed state这个list state要关联起来。那的方法当然就是下。
08:13
Snapshot。这是。在进行快照,把状态要进行持久化的时候,调用的一个方法,那调用它的时候呢,我们当然就是。对。状态进行持久化。我们其实主要就是要把当前的bufferd elements这个列表里边的所有缓存数据应该要全部放到point state。然后弗Li底层就会帮我们把这个算子状态做久化啊,因为这里面的本地变量弗Li当然是不会的,这是G接自己去的啊,那我们现在就是要让flink把它做持久化,那它里边的数据当然就要复制之前的buffer。
09:03
那这里面我们当然就是。复制。缓存的列表到列表状态。那我们这个过程也非常的简单,一个for循环event,每一个element。这个过程跟前面非常的类似,从buffer elements里边把它拿出来,只不过是添加到了checkpoint state里ADD element。这样的话就把它搞定了。这里面还需要注意的一个是我们直接上来就调用了它的ADD方法,那假如说之前还有一些其他的数据的话,那是不是我们现在是一个追加啊,诶,那所以这里边会有一个问题,就是每一次我们上来做保存的时候,其实不需要追加。因为我们当前要缓存的就只有buffer的elements,之前的那些数据如果已经写入到web系统的话,那就不需要再去缓存了,哎,那我们就直接把它清空了嘛,所以接下来我也不需要在状态里边继续保存,写入到外部系统的就全部告一段落了,不再考虑了啊,那这里边我们就应该在处理之前先要做一个清空状态。
10:22
保证状态跟这里的buffer elements是完全一样的。所以我们前面做一个列。这就是进行快照时候我们想要做的事情。然后另外还有一个方法叫做initial,这个就比较重要了,因为在这里边我们相当于要初始化状态。真正的状态的定义其实是在这里。所以我们需要。定义状态。当然,我们定义的是一个算子状态。Operator state。所以为了方便的看到,我们首先先把它的script先定义出来吧。这里面我们定义的跟。
11:04
跟keep state里边其实一样,也是一个list state。它的类型其实完全一样的。啊,那这里边我们需要把这个。它的类型是,然后它的name,我们可以给一个这个就叫。Buffer的element。然后它的类型当然就是点。给一个名字,当前我们就叫script吧。然后接下来就是在上下文当中去获取状态句柄,那现在的这个上下文context。跟之前略有不同。我们看到它可以去get,可以get kid state store,也可以获取get operator state store获取出来的就分别是。按键分区状态和算子状态,我们现在要的是算子状态啊,那所以这里边我们再去拿到所有的算子状态,现在我们看到就可以去获取list state,还有这个list state,另外还有对应的broadcast,我们现在要的是。
12:17
就是最基本的list state,把它获取出来,里边传的当然就是script,把这个描述器进去。那得到的这个东西。当然是要交给checkpoint state,就是我们想要拿到的算子状态list state。现在。当前的状态已经定义好了,但是这个状态如果说是从状态是从故障的情况下恢复出来的话,那是不是还应该要把这个状态给恢复到buffer the elements这样的一个列表里面去啊?那这样的话我们才能接着之前的状态继续往列表里面填数嘛。
13:02
我们应该能够完整的恢复到发生故障之前的情形才对啊,那所以这里边我还需要考虑另外一个场景,那就是。如果从故障恢复。那么需要将。List中的所有。元素复制到。列表中。就是我们本地的那个buffer的element那个列表里啊,所以这相当于就是一个反向的过程,Snapshot state这个方法里边,我们是把列表本地变量列表里边的数据复制到了。List state里边,然后去做持久化,让link帮我们去做持久化,而在initialize state里边呢?如果它是从故障中恢复的话。我们应该把。对于对于flink而言,它的底层会帮我们从checkpoint里边解析出数据,然后放到这里的。
14:09
Pointed state我们的这个列表状态里,但是这个列表状态里边的数据还没有放到我们本地变量,我们后边的操作都是只用到了本地变量嘛,所以这里面还应该再把它复制到本地变量列表里面去。这边我们要做一个判断,If,这里边上下文有一个方法叫做is restore,就是表示当前到底是否是从故障里面恢复的,那如果是从故障里恢复的话,这个时候我们就可以平均分割重组当前所有的数组了。啊这个时候直接去。一个for循环啊,那同样还是每一个element,只不过现在获取的呢,是从这个list state里边get出来的。
15:00
然后把它。添加到。Buffer的elements里面,这样就可以。那当前我们的这个。进行重组的策略。获取这个列表状态的时候,它的。重组策略其实就是最简单的平均分割重组。也就是类似于reb那样的一个发牌的过程,这样的话我们可以知道。就可以完整的把这个状态恢复出来了。这就是这样的一段代码,我们可以测试一下,看看它的输出。我们可以看到数据是一条一条来的啊,那只有现在我们并没有考虑,所以说应该是十条数据之后就会有一次输出,我们可以看到。十条数据,十个event做了一完整的输出,啊,那当然了,接下来又是十条input,之后又会有一个完整的输出,这就是我们想要的批量写入,而且如果发生故障的话,我们可以从checkpoint里边把它恢复出来。
16:09
当然了,对于point和容错机制,我们后面还会讲解。如果说我们希望它真正能够从故障里面恢复的话,还应该要打开当前的checkpoint配置啊,就是允许当前使用checkpoint,然后还有一些相应的配置项可以去进行处理,这是在后边我们会讲解的内容。
我来说两句