00:00
现在已经知道flink里边状态到底是什么东西了啊,那大家会发现在flink里边状态既然是某个任务他自己维护的啊,用来进行计算的,另外还有一整套状态管理机制,保证它的那个容错故障恢复啊呃,序列化反序列化保证这些,那大家就发现了,既然它跟任务是绑定在一起的。可以认为是当前任务的一个本地变量,对吧?哎,那大家想到是不是我们在定义,在写这个弗利代码的时候,有一个特定的算子就应该对应着一个任务,就可以有它对应的那个状态出现啊,哎,所以这里边其实就是状态是跟特定的算子要关联在一起,比方说哎,我们定义一个reduce,那大家想reduce里面有状态对吧?我定义一个window的操作,Window聚合,那window里面是不是也有它的状态啊,啊,甚至后边。给大家讲的时候,大家会发现啊,我在map flat map filter里边也能定义状态,哎,所以就是说它其实它是跟特定的这个算子关联在一起的,呃,就是大家可能会想到,那假如说我这个前面的一个reduce啊,啊,它本身有一里边有一个聚合的这个规约的状态,那后边呢,又做了一个KPI之后又开窗,窗口又有自己的状态,那你说我在窗口操作里面能访问到之前那个reduce的状态吗?
01:22
大家想想能不能访问到?就是后边的操作,后边的任务能访问到前边任务的状态吗?啊,有同学说可以,但是大家想,如果要是访问这个状态的话,我当前任务是不是有可能都前后发生的任务是不是已经分配到不同的slot上了,甚至有可能跨task manager对吧?那你要访问状态的话,你可以怎么样,那相当于是不是还得做网络传输啊,那那状态我们本来是一个内存里边对吧?就像本地变量一样的东西,你还要做这样网络传输,这不可能做到对吧?诶所以大家注意啊,不可能跨任务去访问这个状态,所以状态就是跟特定的算子关联,然后跟特定的任务绑定在一起的。
02:09
所以呃,大家看到就是在flink运行的过程当中啊,为了让运行时的flink了解到算子的状态呢,一般情况都要先注册一下,把这个状态要注册出来,对吧?告诉告诉这个flink到我这里边有一个需要你管理起来的状态啊,接下来他就可以去做这个操作了。那总的来讲,Flink里边就是前面我们讲到的managed state啊,被他的状态管理机制管理起来的状态有两种。一种就叫做operator state,所谓的算子状态,它的特点是所有的状态作用范围就限定于当前的算子任务,也就是前面我们说的,它跟当前算子绑定在一起,跟当前任务绑定在一起,对吧?就只有当前任务里边的数据能够访问到他。
03:02
别的地方你别的任务做操作访问不到了。然后另外还有一个就是监控状态k state啊,我这里边翻译叫k state,或者我们叫分分组状态,顾名思义就是做KBY之后,当前我的这个流里边K的stream,那是不是相当相对应的就有了一个按照K的哈code的重分区之后的这个这样一个效果了,每一个数据它其实都是分到了对应K的那一组里面去的,对吧?那现在我们定义一个状态,如果是基于K定义的状态的话,那它的访问的原则就是。只有当前的当前K对应的数据才能访问到当前K对应的那个状态,对吧?那所以就是状态,它是根据这个K定义的K来维护和访问的。那接下来我们还是具体给大家说一说,呃,首先我们说这个算子状态啊,算子状态operator state,大家通过这张图可以看一眼,算子状态长什么样呢?
04:05
简单来讲就是数据输入进来之后,大家看这里边有有一个TASK1,它有两个并行的子任务,这两个同时在做操作,对吧?然后如果当前大家知道对应在我们代码里面这个TASK1是不是就是一步操作啊,有一个算子对不对啊,有可能是这个reduce啊,有可能是window啊,有可能是其他的一个操作,放在我们这个执行图里面的话,就是这样的一个task,那么。本身他如果有状态,那当前这个状态就只针对。当前任务有效。而且大家注意。当前这个并行的子任务可以跨分区去访问吗?注意不可以对吧,因为它其实是呃,就是已经运行在不同的slot上了嘛,我们说slo本身是隔离的,对吧,呃,你是不能去去跨这个slo去访问的,甚至有可能他在不同的task manager上,那你怎么去访问嘛啊,所以这里面大家注意。算子状态的作用范围就是当前的算子任务。
05:03
那么对于同一个子任务而言,它是共享的,那如果对于其他的任务而而言,即使是相同任务的子任务也都是不能访问的,对吧?啊,都是隔离开的,然后另外它还有一个特点,就是对于当前这个任务而言,所有输入进来的数据大家看。是不是都可以访问到这个状态啊。啊,所以这里面就有一个问题,就是假如说我前面还做过那个KY呢,做过分组呢。那大家会想到接下来我分组之后。到一个分区里面的数据只有一个K吗?不一定对吧,那我因为哈希code的那个还要对这个对应的分区数要做一个取模吗?那是不是我只能保证所有相同的K1定在同一个分区,那是一个分区里边是不是有可能有多个K啊,那这多个K他们访问的是同一份状态还是不同的状态呢?大家注意,这里边算子状态跟K没关系,所以只要是在同一个分区访问的都是同一个状态,对吧?你不管你的K是什么,只要在同一个分区状态是一样啊,这就是算子状态的一个定义啊呃,然后在算子状态的底层,它的数据结构里边,这里可以给大家简单的说一下,它分成这么几种不同的数据结构,首先是一个列表状态,就是我可以把这个算子状态啊定义成一组数据的列表。
06:34
而那大家看,为什么算子状态里面,它直接上来是定义成列表呢?为什么不定义成一个值呢?哎,大家想我我直接定义成一个数不行吗?为什么要要定义成一组呢?哎,因为大家想,如果说你定义成一个的话,后边我们是不是有可能故障恢复之后要有并行度的调整啊,哎,那现在问题就来了,你如果这个状态只是一个的话。
07:02
你如果要是想要,呃,就是如果说想要去合并,这个还简单,那如果想要去拆分,你怎么拆呢。想要重新调整,你怎么调呢?它只能有一个状态对吧,只能给到一个对不对,你假如说这一个状态里边有数据量很大,那最后那就没法拆了,对吧?啊,所以大家看他为了后续做方便,这样的调整,它默认定义就全都是列表状态。然后呢,另外还有一个,呃,这个状态叫做联合列表状态UN set,整体来讲它跟那个列表状态差不多,对吧?哎,那什么叫联合列表状态呢。它的区别就在于就是跟这个列表状态啊,常规的列表状态区别就在于发生故障的时候啊,就是从这个拆point恢复的时候,或者是从保存点重新启动任务的时候,这个时候恢复它到底是怎么样,就假如说我们说的那个并行度发生调整啊,它到底该怎么恢复。你像之前我们这个列表状态诶,默认恢复的时候,比方说我这里举个例子啊,这不是列表状态吗?我当前原先是两个并行的这个子任务,然后调整之后呢,变三个了,扩容了。
08:12
哎,那假如说之前我的这个列表状态,我定义了这个这这个状态啊。然后里边有三个元素,那大家想是不是每一个这个子任务里面都是有三个元素的这个状态啊,对吧,三个元素的一个列表,那接下来分成分成三个之后怎么分呢?哎,这个很简单,就是对吧,我先大概的整个计算一下,这这总共六个嘛,对吧?那所以我把前面的这两个是不是直接分配给第一个。第一个调整之后的这个子任务啊,它的状态就变成了这两个对吧。然后哎,我把。第二个任务的两个给到第二个,它是不是也是两个啊,然后我把剩下的这一个和这一个合起来给到第三个启动的这个任务,那大家看这个过程是不是就是任呃状态的一个重新分配的一个过程啊呃,这个就很简单对吧?啊,这个是列表状态啊,它在做这个重新启动的时候这样做,那联合列表状态跟它区别在哪呢?
09:15
呃,也简单,就是联合嘛,联合的意思就是说我后边这个假如说重新调整了,我不是直接就分配好,然后单独传这个,我是把所有的,这不是相当于拆开是六个状态嘛,我把这六个状态联合起来,全部给这里边的下游的三个子任务都给他们发一份,然后他们自己去从里边去去摘去选。啊,其实就是这样的一个过程,对吧,从效率上来讲应该还是list state就就是最好的啊,特殊场景可能用这个联合列表状态。另外还有一个特殊的状态叫做广播状态,广播状态这个更好理解。广播嘛,那就是说当前所有这个分区状态,大家注意是不是都是一样的呀?啊,也就是说这里边我们的这个状态保存的相当于都是同一份,也就是相当于广播出去了啊,这个是在有一些场景下,大家可能想到就是比方说一个算子,这里边有多项并行的子任务,然后呢,我要求它的这个状态又都相同,什么情况是出现这种情况呢?就假如说我加载的是这个状态,是加载进来的一个配置项。
10:22
那大家想这个配置项是不是应该每个分区状态都一样啊,啊,这个我就可以做一个状态广播出去,对吧?这是这个特殊的用法,好,那接下来我们就在代码里边给大家做一个简单的实现,大家看一看这个算子状态是一个什么样的应用场景。啊,那我们接下来还是在Java下边新建一个Java class啊,那接下来我们这个是com点。At硅谷点API test,当前是state相关的,呃,我们先首先写一个这个是state test test1啊,当前我们这个要测的是operator state,好,我们先把这个放在这里啊,那整体前面的处理流程还是一样啊,Main方法先写出来,接下来这个throws,一个exception啊,然后下接下来是那个创建执行环境,然后读取数据源啊,然后做对应的那个转换输出,对吧?这里面我们就简单的先做一个呃,这样的一个。
11:29
一个实现啊,这里面我直接用前面这个time window吧。数据源先读进来啊,这个是环境啊。先放在这儿,然后后边是读取数据源。并且转换成censor reading类型,这过程都是一样的,所以我就直接copy过来了啊呃,然后最后的话,那是有一个env execute执行起来。然后在在中间这里边我们就可以做一个做一个这样的一个尝试了啊呃,大家可以看一下,就是这里边怎么样去定义这样的一个,呃,一个算子状态啊,那前面我不是说到其实像map filter flat map这样的算子也可以有这样的状态吗?哎,那首先我们就看一看啊,我们就直接定义一个算啊这个啊,我们定义一个有状态的map操作,我们用来干什么呢?
12:34
我类似于实现一个count的功能,比方说我就是统计当前,哎这个有多少个温度值对吧?哎有有多少个这个数据啊,诶那所以啊,大家可以想到就是呃,统计啊当前如果我是这个算子状态,是不是统计的就是当前分区的所有的温度的个数啊,对吧,就跟K是没关系的啊,统计当前分区。
13:02
呃,这个数据个数好啊,所以接下来我们这里面就直接写一个这个操作啊,Data stream,直接去做一个map啊,那这里面我要去你有一个自己的定义的啊,对吧,My count map把这说明出来,这个叫做一个result stream,下边我可以把这个result stream做一个打印输出,接下来就是要把这个做个实现了啊,这是。自定义map function实现这个啊,Public static class,这里边是my count map,大家知道我必须要实现的是一个对map function。My function它有两个泛型,一个是输入的数据类型,一个是输出的数据类型,啊那这个我们知道当前输入是sensor reading输出,假如你只是统计个数的话,那我是不是直接来一个inte就完了,对吧?直接给一个个数就完了啊,我也不包装了,然后接下来诶,大家会想到在这里边我是不是直接实现这样一个对实现一个map方法是不是就完事了呀?啊那这里面大家会想到,既然你要做这样的一个抗的统计的话,那怎么办呢?
14:25
大家知道是不是每来一个数据就会调这里的这个map方法呀,其实我们想到了,大家想我是不是直接在这儿定义一个是定义一个本地变量,然后用这个,呃,把它叫做count,初始值是零,然后每来一个在这就加一,当成我当前这个类的一个属性是不是就可以了,哎,这个非常简单对吧?哎,所以大家看这个定义一个本地变量啊,然后。作为算子状态,这个非常简单,我直接private对吧,这是一个属性啊,私有属性inte定义一个count值,初始值等于零,然后这里边是不是每来一个就要count加一啊啊那这里边为了大家可能知道这个Java里边这个加加前加和后加经常还搞不清楚是吧?啊,那你就单独写出来对吧?你万一要return的时候,你你忘记它是在前还是在后啊,这个就容易搞错,所以我先把它加一,然后是不是返回这个com值就可以了,这就是每来一个加一嘛,非常简单,诶大家说当前这个,我这就实现了一个算子状态吗?这么简单吗?
15:35
这跟这个本地变量不是没什么区别吗?啊,确实还真是啊,算子状态从使用上来讲,真的看起来就像一个本地变量一样,但是大家想你如果不做,不做特殊说明的话,Flink是不是不知道我这个东西要做特特殊的那个状态的处理,管理容错的时候,我要把它做那个恢复和那个调整啊,呃,他是不知道这些东西的。
16:01
所以接下来我们这里边要除了这个本地变量之外,这只是叫一个本地变量,对吧,它可以实现这个功能,比方说我们这里边直接去统计啊,你看要我我我这里边直接运行一下。哦,当当然这里边大家看到,呃,这个map这里边是哦,这个类型推断不符合这个对吧,我们这里面得到的是那个inte对吧,得到的是一个整形的结果啊,大家可以先运行一下,看看这个得到的东西到底是个啥。大家觉得这个能正常输出吗?能输出我们那个count的统计结果吗?哎,大家想这里边我得到的这个不就是当前这个一个count值,然后不停的加,不停的加,然后然后得到这个结果嘛,对吧,其实这个是没有任何问题的啊啊哦,这里边一直没输出,这因为我们是socket流是吧?啊,我把那个还是NC啊,这里边输几条数过来。大家看是不是一啊,再来一条是不是二啊啊,当然我这里面可以给一个那个三四十六。
17:04
大家知道现在是不是跟K没关系啊。直接来了之后,哎,直接输出12341个一个来一个count一次来一个count一次完全没有问题对吧?哎,所以这个是看起来是没毛病的,但是大家知道如果发生故障的时候,我是不是并不知道该怎么样把这个状态恢复出来啊。大家知道这这个就在内存里边啊,这个本地变量是不是真的在内存里啊,那那你真的是直接掉电之后,你这里边根本就没存啊,那到时候难道是从零开始count吗?那我们这个代价就太高了,对吧?啊所以呃,这里面我们必须要有存盘就是。把它做这个想要做容错的话,是不是必须要有一个存盘,然后故障恢复的机制啊,那这里面存盘当然就是我们前面提到的checkpoint啊,一个检查点这样的机制,那他在跟这个checkpoint如果要想绑定在一起的话,那这里面怎么去实现呢?就当前的这个map function啊,啊,我还必须当前的这个类,除了这个实现map function接口之外,还必须实现一个,哎。
18:11
这个比方说list checkpoint的这样一个接口啊,那这里面本身也是它需要有一个泛型,这个泛型就是指定你当前到底要保存的那个状态对吧?呃,拆point的啊,在检查点里面保存那个状态是什么类型,那大家想我这里面保存的是不是就是一个integer啊,对吧?就是一个当前的这个呃,整形数据啊,那为什么叫list checkpointed呢?是不是就是要保存成一个list呀?是不是我们说的那个列表状态对吧?呃,算子状态里边的那个列表状态啊,那大家看一下,这还报错的,那你既然是接口嘛,必须得实现方法,大家看一下这个方法是一个叫做snapshot state。另外一个叫restore it啥意思?这是不是就是对状态做快照返回,是不是一个integer类型的list呀,返回一个list,这是不是就是要把它保存到checkpoint里面去,然后另外还有一个restore state,这是干啥?这是不是就是发生故障的时候,我就调这个方法,从已经保存的checkpoint里边,是不是把它解析出来,再恢复到我们当前的这个变量里边来啊,哎,所以大家看这其实就是这两个方法来保证我们这个故障恢复的。
19:28
所以这里边的这个snapshot state那怎么办呢?啊,但是这里边我就简单做了啊,大家想到就一个count值的话,这个其实好像我也不用拆分对吧,因为就一个数嘛,你再怎么样,它也就是一个音体四个字节对吧?啊,这个其实不会涉及到什么啊,所以我就直接来一个。大家想我要返回一个这个list的话,那是不是我得对,比方说我。S single list对吧?啊,或者你去你有list然后再ADD也行啊,我这里边非常简单,就一个元素嘛,Single list是不是countt直接保存进来就完事了,直接保存对吧,然后另外如果要是恢复的时候怎么恢复呢。
20:07
大家注意这里边恢复拿的是不是这个list里边的state啊,哎,那大家就注意了,这个我是不是不能保证里边到底是一个还是几个啊,那有可能有多个状态对不对,那多个状态我当前有可能不同分区的那个抗都都都发给我了,对吧?我现在要合并病情都小了,那怎么办呢。是不是遍历这个list,然后把它拿出来啊,对,所以这里边就是一个for循环嘛,非常简单for循环啊,Inte当前的这个整形啊,我定义一个number,然后当前state里边拿出来,是不是每来一个就count加等于number啊,就这么简单。啊,这就是一个最基本的一个算子状态的一个实现。啊啊,所以大家可以看到就是这个具体的使用啊,当然在实际使用的过程当中呢,呃,就是这个算子状态在有一些场景下还是有用的,但是更多的场景还是要跟K相关啊,所以说算子状态相对来讲用的会比较少一点。
我来说两句