00:00
我们了解了flink当中状态的分类,那其实我们就会发现,在实际应用当中,绝大多数情况,我们其实都应该是要把数据按照K去进行分区操作啊,那前面我们也提到,不管是聚合还是开窗窗口,那其实都得先批办,先进行按键分区,然后再去做相关的一些操作。所以在实际应用当中最为常见的状态类型其实就是keepit。那前面我们提到聚合开窗,所有的这些算子,它都是有状态的算子,它们本身所持有的状态本身就都是K。当然了,除了在已经flink底层给我们定义好的这些接口里边啊,直接使用对应的状态之外,我们还可以通过负函数类function对于任意的转换算子进行扩展,然后呢,我们在这个转换算子里边可以去调用get runtime contact方法获取当前的运行上下文,然后呢,调用它的get state方法去获取到相关的状态的访问句离通过这样的方法我们同样可以实现对于k state的操作啊,那这里边我们在函数里边自定义的状态,如果说是基于KBY之后的k string做的操作的话,那这样定义出来的也是k sit。
01:31
那接下来我们就来针对k set来进行一个具体的讲解,那前面我们都已经介绍过了啊,案件分居状态k set它的最大的特点其实就是。不同的状态要以K为作用范围来进行隔离啊,就是所有的数据来了之后,我们进行处理操作。它访问和。进行更新的那个状态都是只针对当前K有效,那因为我们之前进行KY之后啊,所有相同的。
02:08
K的数据都会分配到同一个并行的子任务里边啊,那如果说。当前任务已经定义了状态的话,我们就会为每一个K的值维护一个状态的实例,这样的话,接下来所有当前K的数据都是会访问当前K对应的那个状态而已,它不会访问访问的混乱啊,访问到其他K的状态,这样的话我们统计就会非常的简单了,比如说之前我们所说的word count啊,那如果说我们以当前的每一个word作为key进行划分的话,那。当前word是A,它有一个状态,来一个A,这个状态计数器就加一,那同样B,它也有一个状态,来一个B就加一。每一个数据来了之后都是分门别类,对应K的计数器才会增加,这样的话完全不会混乱,我们可以按照需求把每一个单词出现的频次全部统计出来。
03:11
那所以它的核心关键就是要在这个每一个状态,我们当前说这个状态是放在每一个并行子任务的实例里边的嘛,那所以当前每一个并行子任务里边,它所保存的状态到底是什么呢?应该是针对不同K。设置出来的这样的一个类似于key value的结构啊,所以对于flink的底层啊,它其实是对于k state进行了一些特殊的优化,它的底层保存相当于是一个分布式的map数据结构啊,就是我们说的映射key value嘛,保存成了这样的建池,对这样每一个数据来的时候,我们不是有它的那个哈希code嘛,按照哈希code。去进行分区吗?哎,那接下来它的哈code当然就可以非踌速的直接找到对应的存储位置,在这个map里边的存储位置,然后我们就直接访问它对应的那个值,也就是。
04:13
当前K所保存的状态就可以了,这样的话,不同K之间的状态彼此隔离,不会互相访问,不会去共享。这种将状态绑定到K上的方式,那就相当于我们的状态和流里边的逻辑分区啊,就是T败之后的这个逻辑分区,那就相当于一一对应了,接下来所有数据都是根据K来进行对应的操作,也根据K去访问相应的状态,完全不会有错乱和。共享其他K状态的这种情况出现,那这样的话就可以保证我们的逻辑直接用一套就可以了,不用去单独定义逻辑,弗link底层那就是按照K啊,来了是谁的就去访问谁就完事了,哎。
05:01
这样的话,我们对于状态的操作都是本地进行的,而且代码逻辑完全一致。另外还有一个好处就是。在应用的并行度改变的时候,我们知道状态是需要进行重组的,这个时候呢,不同K对应的这个k state,它可以进一步组成所谓的建组啊,就是所谓的这个k group。每一个组呢,它其实都可以对应着一个并行的子任,那接下来这个键组是怎么样去做重新调整,重新分配呢?啊,当并行度发生变化的时候,那键组就是去重新分配kit的一个单元,键组的数量就等于定义的最大并行度。而当算子的并行度发生改变的时候,K它就会按照当前的并行度重新去进行平均分配,保证运行的时候各个子任务的负载是相同的啊。
06:00
也就是说,呃,那大家可能会思考一个问题,就是说那何必这么麻烦呢?我直接根据不同的K,当前每一个并行子任务里边,它不是已经包含了这个不同K的对应的那个状态实力吗?我直接根据这个K再重新做一个打乱分配,平均分配不就完了吗?啊,当然这里边这样做也是可以的,但是不够高效,因为我们知道在实际运行的过程当中可能是非常非常多的。可能成千上万,甚至是呃,非常非常庞大的一个数字,那么对于这样非常大的海量的K而言,你直接统把它做一个统一的分配,显然是不够高效不够划算的啊。所以对于这一个所谓k group的一个分配,那就相当于是把无数个K,非常海量的K先做了一个分组的划分啊,那每一个并行子任务上,我就可以先有这样的几个。建组,然后接下来发生调整的时候重新分配一下就可以这样。
07:05
就可以保证运行的时候各个子任务的负载是相同的。当然了,在使用的过程当中,在代码里边我们需要注意的一点就是使用k set必须是近基于K,也就是说必须先K,没有做KBY的data stream直接,假如想直接定义k state的话,代码上不会报错,但是运行同样是会报错的,因为它不能通过运行上下文直接去访问到当前的按键分区状态,因为没有进行按键分区的操作。这是我们需要注意的一点。那另外就是对于flink而言,它的k state其实是支持各种各样的类型的,前面我们也已经提到了,主要有四种不同的类型,接下来我们详细的做一个介绍。首先是值状态value state,那顾名思义了,Value state就是在这个状态里边只保存一个值,一个value。
08:04
在源码定义里边的value state本身是一个interface,是一个接口啊,我们看到它有一个泛型T,那这个T当然就是啊,就是当前这个状态保存的这个值里边的数据类型啊,这可以是flink支持的任何的具体数据类型。那所以这里面我们可以看到,呃,如果说我们想要保存一个长整型的值作为状态的话,那我们就直接定义一个value state law就可以了。那这个接口里边有两个具体的方法,一个叫做value,返回一个T类型的值,那另外一个是up没有返回值。很显然value就是获取当前状态值的方法,而update呢,就是更改当前状态值的方法。所以就是一个读一个写,非常简单。这是只有一个值的情况,前面我们在代码里边在Bill check进行实时对账的这个例子里边定义的状态就都是value state,当时我们的里边的这个泛型使用的是自己定义的这个三元组的或者四元组的这个数据类型啊,当然你如果用基本的数据类型啊,当然必须得是包装类的类型啊,基本的包装类的类型也是可以的,或者使用我们自定义的。
09:20
Po类类型也是可以的,这个只要是flink支持的都行。当然了,在具体的使用过程当中,为了让运行上下文清楚到底现在我们要用的啊是哪个状态,我们还需要创建一个状态描述器,就是所谓的state script,对于直状态而言,这个类就叫做。Value state script啊,那它主要是要提供当前值状态的基本信息,在源码里边我们会发现啊,Value state的状态描述,它其实主要就是一个一个name,当前状态的名称,以及当前状态的一个类型,而且我们会发现这跟定义一个本地变量的时候做的事情一样啊,哎,我们随便定义一个。
10:07
比方说我们在这里边随便定义一个变量的时候,诶,那我们同样不是你得指定它的类型是什么,然后指定它的名称是什么吗?诶,那所以现在我们定义一个value state的时候,做的操作一样,传入名称和类型就可以了。有了这个描述器,运行时状态就可以获取到当前这个状运行时环境运行上下文就可以获取到当前状态的状控制句柄了啊,那关于这个代码里边到底怎么用,前面我们都已经。有了一个初步的了解,我们会发现,诶,定义的时候,那是要调用运行上下文的get state方法,然后把这个描述器传进去,就可以相当于获取到当前的状态,那接下来调用状态的读写方法就可以对它进行操作了,具体的使用我们可以在下节内容里边再详细讲解。
11:01
那除了最简单的value state之外,其他几种类型呢?其实也非常的类似。呃,另外一个就是,首先是list列表状态啊,那我们知道需要保存的数据有时候不止一个,那假如都是类似的相同类型的这样的数据的话,我们可以把它保存成一个列表啊,那所以这就是所谓的列表状态list state。Li state呢,本身也是一个接口啊啊,那它同样有一个泛型TT表示当前这个列表里边保存的每一个状态元素的基本的数据类型啊,那它同样也提供了一系列的读写方法,跟一般的列表操作是非常的相似的,这里边最重要的就是哎,Get,我们看get的话,当然就是获取了五,它的返回是一个able类型的。有一个泛型T对吧,那所以我们得到的其实是一个可迭代的集合类型。
12:01
那另外有毒就有写,同样可以直接update update没有返回值,它就是直接传入一个list列表啊,然后把当前的列表状态直接更新,另外呢,因为它是列表呢,除了全部去全局做更新之外,还可以一点一点去更新,也就是可以在列表后面追加啊,那我们可以调他的爱的方法。追加的是一个T类型的元素。最后的效果就是在当前的列表状态里边追加增加一个。值为Y6的元素。啊,那同样我也可以一个一个去添加,也可以一次添加多个,那这个就叫at all,它传入的是一个list。所以整体来讲还是非常简单的啊,那同样类似的使用过程当中,我们也需要先去定义它的一个描述器,它就叫做list state script,用法跟上面那个状态描述是非常相相似的啊,可以说完全一致,也是只要传入。
13:06
它的名称以及类型就可以,这个类型我们知道,它就不需要去再指定这个list的类型了,只要指定里边的这个T,我们想要的这个元素的类型就可以。这个例子的状态,其实在前面我们讲到top n的时候也有具体的使用啊,那后边我们还会在代码里边详细的对它的使用做一个具体说明和讲解。那除了前面两种简单的Kate类型之外,还有两种之前可能我没有接触过的那一种呢,就是映射状态,其实也非常的类似,看起来复杂一点,那其实也只是我们之前接接触过的map类型的一个扩展而已,它跟我们之前讲到的这个map类型啊,Java里面的map类型非常的相似。简单来讲,就是把一些键值对作为状态整体保存。
14:00
可以我们认为当前保存的状态就是一个k value,它的一个列表,所以对应的这个类型叫做map state,那么它有两个泛型,一个叫UK,一个叫UV,那我们知道这当然就是K和value它们的类型啊,他们的数据类型可以不同。呃,那对应的它提供的操作方法呢,跟map的操作也是非常相似的,比如说可以get get当然是获取元素了,获取的时候呢,传入的是K,然后得到的是value啊,那就相当于一个map里边获取某一个K对应的值嘛,也可以也可以更新啊,直接调put方法,Put方法put一个K,对,那如果说当前没有这个K的话,当然就是就是直接插入,如果说已经有了对应的K的话,那就是更芯。另外还可以葡,那就是把一组KW对全部添加进去,另外还可以remove删除掉某个K对应的建值对还可以判断包含是否包含某一个,那那另外呢,还可以获取当前所有的间值,对啊,Entrance还可以获取当前所有的key和当前所有的value,这跟一般的map的操作非常的相似。
15:18
那另外还有就是判断当前是否为空啊empty,这跟这个map的集合类型的操作基本上是完全一样。另外呢,还有一个叫做规约状态的比较特殊的状态类型啊,那这个规约状态reducing state啊,它有点类似于直状态value value state啊,不过它需要。它的这个操作啊,是需要对添加进来的所有数据进行一个规约,也就是说它有点像是要做一个reduce,直接你添加一个值进来的话,相当于就是跟之前的值做reduce,那所以它保存的其实是我们规约聚合之后得到的那个结果。
16:03
啊,那整体来看的话,Reducing state这个接口它的调用方法呢,是类似于list state,呃,大家要需要注意一下啊,就是它保存的结果类似于值状态,类似于value,因为它就是只有一个聚合结果嘛,只有一个值,但是它的调用接口的方式呢,是类似于list,为什么呢?因为它有at方法对吧?那因为我们知道你要做这个规约聚合的话,很显然前面是有很多值进来的嘛,很多值进来它要做一个对应的这个操作,不停的叠加,不停的规约,那其实类似于我们要保存一个列表了,当然我们现在保存的不是列表,而是一个聚合状态,一个规约状态,那所以每次调用A方法的时候,不是在列表里边添加元素,而是。直接把新来的数据和之前已经约得到的结果进行再次做一个规约,哎,得到的结果那就更新状态。
17:05
那么规约的逻辑呢?这个reduce到底怎么样去reduce呢?这里边是要在reducing state script这个状态描述器里边去做定义的,这个描述器里边除了string name string类型的一个名称,还有。呃,当前我们想要描述这个类型啊,具体。保存的这个状态类型的这个Type Class之外,还要传入一个reduce方式。这个reduce function规约函数跟我们之前讲到的reduce聚合算子传入的那个reduce function是完全一样的。啊,那所以这就跟我们相当于把这个。State啊,跟之前介绍过的聚合操作统一在一起了。其实我们能够想到之前介绍的reduce操作,它本质上也应该用到的就是这样的一个reducing state。
18:05
那除了这个reduce之外,当然我们就想到了还有更加一般化的聚合,所以还会有一个更加一般化的聚合状态,叫做。Aating state a state跟前面的reducing state非常类似,它的结果啊,保存的这个这个东西也是一个值,也类似于一个value state。而且跟reducing state类似的是,它也是之前所有值,所有数据聚合之后的结果保存在这里啊,那所不同的是它的聚合逻辑不是由reduce function定义的,而是传入一个aggregate function来定义啊,那这里的这个a function其实就是我们之前在讲到窗口聚合的时候所讲到的那个a function跟那个是完全一样,那我们知道它的特点就是它使用一个累加器accumulator来表示中间聚合状态,它的状态类型可以跟数据类型完全不同,而且它输入输出类型也可以完全不同,所以它是非常的灵活的啊。那同样aggregating set接口方法,它跟。
19:15
调用的时候跟reducing也是完全一样。直接调一个ADD方法,添加元素的时候会使用我们指定的那个aggregate function去进行一个聚合计算,然后更新状态,得到对应的结果。这就是关于Kate state所有类型不同类型状态的一个基本介绍。
我来说两句