00:00
前面我们讲到算子状态,大家会发现它主要是针对当前分区任务的状态去做一个维护和访问,所以它有一个特点就是K对它没有关系,对吧?呃,前面我们测试的时候是没有做KBY啊,那其实如果你做了KBY的话,你最后会发现是一样的啊,跟这个是没有关系的,所以当前你如果要是说我们是针对不同的K,比方说像我们传感器啊,针对不同的三次ID,要去做一个聚合,要去做一个统计,要去得到它的一个状态,这个时候用算子状态是不是就不太合适啊?所以更常见的在实际项目当中用到的更多的是监控状态k sit,那大家看一下这个k sit对应来讲是什么什么一个形式呢?啊,那首先k state是不是之前一定要做过KBY啊,你想要定义k state,那肯定你得有K啊,啊,那连K都没有那就谈不上了啊,所以前面至少是有一个基于k has code做重分区的这个过程,然后接下来大家就想到了我这里面TASK1,这里面其实写示TASK1TASK2啊,但是他俩其实就是还是同一个任务对吧?并行的子任务对不对?并行子任务大家看到当前我每一个任务里边是不是有可能有多个P啊,比方说大家看现在的蓝色和这个浅粉色他们的这个数据啊,这是颜色就是K啊呃,那他们是不是都到了这个分区一里边来,然后这个绿色和呃这个紫色啊,这个数据都到了分区二里边来,所以接下来。
01:33
我们当前的状态是怎么去保存呢?是一个分区只保存一份状态吗?大家注意不是看这里边是不是每一个K对应都会保存一份状态实例啊。啊,所以监控状态,它其实就是根据数据流里面定义的K来维护和访问的每一个K1份状态。那么这里边访问的规则也是当前这个数据,只要是呃,大家看这个蓝色或者这个浅粉色,都会到第一个分区这个任务来,对吧,它是都能够调用这里面这个类的实例的,但是如果说我们要访问它的k state的话,注意它只能访问自己对应的K的那一份。
02:18
这是弗link底层帮我们搞定的。啊,就是如果说你这里边来了一个这个蓝色的数据,他想访问这个浅粉色的状态,不好意思访问不到,你只能访问自己这一块啊,你这里边如果在在count再做其他的操作,那是没问题的啊,所以这个大家想这是不是就更加的符合我们的预期啊,我们预期感觉就应该这么去做,对吧?好,所以处理一条数据的时候,它的状态访问范围就会自动限定为当前数据的K。对应的那个state。那接下来我们看一下这个监控状态有什么具体的数据结构呢?大家一看这个这个监控状态是不是数据结构就非常的灵活了,比之前的那个算子状态就看起来灵活多了啊,首先它可以有一个值状态,Value state value state就是把一个状态表示成一个单个的值,大家想为什么现在你不需要把它放成一个列表,你不考虑它以后这个呃,并行度增大减小,对吧?要要去合并或者要去要去扩容的那个分散吗?
03:23
哎,其实大家想到当前的某每个状态是一个值,但是我当前只有一个K吗。在想有多个K,那最后要分,你是不是按照K给呃做做一个划分就可以了呀,那所以他天生其实就就可以处理这种场景啊,做这种调整划分的这种场景,所以当然它就可以保存成一个单独的值了,Value it啊,这是最基本的,当然了,如果说你要保存的就是一组数据的话,我也可以定义一个列表状态lista state,对吧?啊,这也是可以的,另外它还支持更加丰富的做法,我可以保存成一个映射状态啊,就是所谓的map state,这保存的就变成了一个key value,对对吧,一个key一个value去做一个保存啊,那除了这些之外呢,还有更加复杂的一种状态,叫做聚合状态,这聚合状态分成两种,就是一种叫reducing set,另外一种叫aggregating set,大家一看这个名称,是不是就联想起了之前在这个窗口里边增量聚合函数的reduce function和aggregate function啊?
04:27
哎,所以这里边其实是差不多的啊,它其实就是reducing state里边你就要定义一个reduce function,然后表示当前这个状态来了之后呢,就是状态如果要做叠加操作的时候,它就是直接调用这个reduce function做一个聚合啊,这就是这样的一个处理,那如果要是这个呃,Aggregating set的话,当然里面就是一个aggregate function了,对吧?啊,所以这个整体来讲还是非常的简单的啊,呃,所以接下来我们就看一看这个在代码里边监控状态怎么样来使用,诶大家一看这个的话,会发现这个代码里边好像还不是那么的简单。
05:06
稍微的有点复杂哈,呃,首先大家看一下,就是这里面我们这一个,呃,就是监控状态声明的时候是怎么去声明的啊,它要用到当前的运行时上下文要用到环境对吧?大家想一下为什么刚才我们定义那个算子状态的时候就是本地变量对吧?直接拿出来就行了,为什么现在又要用到运行上下文呢?因为大家想到现在是不是我要针对不同的K要把它区分开啊,每一个状态是不是每一个K都要维持一个不同的状态实例,但是大家想在代码里边,假如说啊,我们是在一个算子里边,就就像我们刚才这个,假如就在一个这个map方式里边,你要定义一个key state的话。哎,大家想我这里边当前这个类的实例,对于不同的K会有多个类的实例吗?是不是只有一个呀,因为大家想这个类的实例这是一个一个任务一个对吧?你当前这个任务它还是同一个呀,只不过是在这个任务里边是不是可以接收多个不同的key数据啊,所以大家会想到当前的所有的这个,呃,不同K的数据来了之后,在当前这个任务里边访问到的这个当前我们这个map function的实力还是以同一份,所以大家想如果你定义本地状态,或者定义呃,这个就是本地变量啊,或者定义算子状态,是不是一定是所有不同的K来了之后都会影响到他呀,都会访问到他,都会改变它,对吧?那最后就相当于还是一个就是算子状态这样的一个,呃,一个一个模样,所以接下来我必须要根据K对它做一个区分,那哪里能对K把它区分开呢?那就是得在运行上下文里边。
06:52
看了对吧,运行是上下文里边来了一个数据,我就知道你当前的K到底是什么,现在哎,我应该去访问对应的我我维护的那哪一个K的状态啊,所以在这儿它是在这儿去用的啊,那大家回忆一下这个get装contacts是在哪里边可以调。
07:10
哎,对,这个大家就想到了,这是不是只有在reach方身里面,之前我们说过可以获取到当前的运行上下文啊,所以如果想要用算子状态的话,这个无所谓,在哪都能用,如果要想用键控状态怎么办?哎,必须得在reach方式啊,复函数类里面才能用,那后面大家看这个写法是直接点get set,然后里边呢,去new了一个value state script,一个描述器,那么这个状态描述器主要要描述什么呢?也很简单,描述的就是当前的状态的名称和状态的类型,对,你就把这个定义出来,然后后面就可以用了,后面的用用法当然就是读和写了,读取状态的时候,你看调这个状态的这是一个value state,对吧?我这里边调的是它的点value,就是读取出它的这个整形的值。
08:03
然后赋值的时候update,直接点update传值就完事了。啊,这就是这个对于监控状态的一个基本的使用,所以接下来我们还是在代码里边做一个测试,来看一看这个监控状态怎么来做。新建一个class啊,State test2,然后这个是k state。啊,那前面的这个流程基本上都一样对吧?呃,我就直接copy了啊,我甚至可以直接把当前的这个main方法直接全copy过来。呃,然后这个copy过来之后,大家会想到我这里边做的这个map操作可能就会有所不同了,对吧,之前我这里边是直接,呃,上来之后就直接map了啊,然后直接。传了一个这个map function,而且实现了类似checkpoint的接口,定义一个算子状态,那现在我们要做操作的话,不能那么简单粗暴了,那是不是要要根据啊,我们是统计当前sensor对吧,Sensor的数据个数,那是不是首先应该要做一个KBY啊k state嘛,要先做k by ID对吧,然后接下来。
09:22
接下来是不是同样KPI之后也可以做map呀,大家看这个是没有任何区别的是吧,来了之后你直接map没没毛病啊,所以接下来我们是MY,呃,比方说这个叫key count map。把这个定义出来。所以接下来我们就要实现这个map function了啊啊,所以这是自定义map function,诶,呃,问题来了,现在我们直接用慢function能搞定吗?搞不定,对,必须得是reach慢function对吧?啊,所以接下来我们是自定义实现一个reach map方式,好,Public static class啊,然后当前我这个叫my key count map,好,接下来啊,不是implement,因为大家还记得那个rich function,对,它是一个抽象类,所以我们是继承extend一个,呃,当前是reach map方式,同样它也有输入输出,当前的输入是sensor reading,输出是。
10:36
跟我们前面那个定义是一样的啊,好,先把它放在这儿。然后接下来这就这就要首先我们看一下这个就是get runtime context,这个我们要调用声明这个状态的时候啊,大家要注意一下啊,你看我在这儿定义的时候,前面好像连那个呃,这个my value state,它这个变量的类型都没给。
11:00
这Java这个语法不对啊,那说明我这里其实是干了件什么事呢。已经声明完了,这儿只是做了一个赋值,对不对?诶大家想一下,为什么我这里边声明和赋值会分开呢?我不能直接声明的时候直接赋值吗?哦,我们先试一下啊,大家会想到在这儿我是不是直接可以比方说private private,我定义一个当前是不是要定义一个value state呀,Value state一个音体。Integer啊,当前这个我叫呃,My key count对吧,这个key count state。呃,大家会看到,如果说我在这儿直接去get wrongtime contacts,然后去大家看直接是不是get state啊,获取当前的这个,我们管这个叫控制状态的句柄啊,状态句柄handler,然后你有一个value statescript描述器,告诉这个当前我们的这个环境啊,Flink,我要取的那个状态到底是叫啥,然后它的类型是什么,然后接下来给一个名字,这个叫k count,然后后边是不是还得有一个整形integer的class,对吧,把它这个传到这儿,呃,大家看直接我把这个是写到写在这儿了,然后接下来如果我要去定义当前的这个这个流程的话,呃,那是不是。
12:31
必须要写一个实现一个map方,呃,Map方法呀,在这个map方法里边其实比较简单,我只要做什么就可以了。Return什么呀啊,其实这个大家就想到我是不是得把当前的那个值得拿出来啊,对吧?啊,或者说我直接不拿,然后直接去,呃,就是呃,肯定得得先拿出来对吧,先拿出来加一,然后再赋值赋回去对不对,因为大家知道这个value state这个是不能直接加加的,这个大家知道对吧?啊,而且你这里边如果直接把这个值拿出来加加也不行,你还得update对不对啊,所以这里边你看啊,我们的这个操作就是拿出来它的点value,我们把它定义成当前的count对吧,然后接下来是不是count加加呀,然后接下来在k count state做一个update,是不是做一个这个操作啊啊,所以这就是一个基本的实现啊,然后后面是不是可以直接return一个当前的count啊,我可以直接输出当前这个count值是什么,对吧?啊这个就就是没问题了。
13:43
了啊,这就是整个处理的这个流程,那但是这里面大家其实会发现,你如果直接运行的话,你会发现啊,运行它会报错,因为。哎,就是我当前是不是就是当前我在调用这个方法的时候,这是在当前这个属性声明的时候,直接就要调用这个方法呀。
14:05
而我们现在什么时候才能拿到运行上下文呢?是不是必须得在当前的这个任务实力,呃,所有的这个环境都已经创建出来之后,才能够调用这个方法呀,那这个方法是必须要等到。哎,就是后面之前我们讲过的生命周期open之后,当前是不是所有的这个才能初始化才能做操作啊,哎,所以当前我不能直接在外边做这个操作,那我怎么做。是不是就是把这个要直接对。直接实现一个open方法,在open方法里边做这样的一个操作,对吧?啊,那当然了,我是不是应该在外边先把这个声明出来啊,因为如果说你不把它声明出来的话,这个是是不是就只能在open方法里边用,不能在map里边用了,所以大家看我的这个写法是在外边先做一个声明,然后呢,Open生命周期里边赋赋值,然后指定它的这个,呃,运行上下文里边的状态句柄,然后接下来后边做这样的一个操作。
15:12
当然你直接去做这个操作的话,还有另外一个问题,就是说你看我这里边这个状态没付初值对吧,你要没付初值的话呢,一开始是啥呢。大家注意一开始就是nu对吧?啊,那当然我可以去判断,如果说这里边它是nu的话,我count就就叫做零对不对,然后再加加然然,然后做这个操作,那当然也有另外一个偷懒的方法,我可以直接在这儿。定义这个script的时候,大家看可以给第三个参数,第三个参数就是一个default value对吧,那就是一个默认值,当然大家看这个方法已经要被弃用了,他推荐的方法是什么呢?大家看这个推荐的是你直接就传两个参数,然后后面呢,手动去判断是否为闹对吧,然后再去给数值啊,所以呃,这个就是大家如果偷懒的话,可以暂时这么用啊,但是更好的还是你在后面去用那个判断是否为囊去做一个操作,这就是整个这个处理的过程,对吧?好,那接下来我们再运行一下,给大家看一下这个效果怎么样。
16:19
好,我们还是把这个放在这里,接下来起一个NC来做一个测试。好,还是按之前的这个流程啊,341给一个数。大家看一对吧,346给一个数。给一个数,哎,大家看是不是还是一啊。啊,所以大家会想到之前我们做这个sum,做这个max命的时候,是不是其实都是只针对当前的K有效啊,所以大家自然能想到是不是之前我们调用的那些聚合,它里边的状态应该都是一个,是不是都是一个k state啊,啊所以k state是最常见的啊,大家看我继续给这个341,是不是接下来就只有341在在递增啊。
17:05
对吧,1234,那如果说我再给一个这个347的数据,大家知道是不是会输出一个二啊,是在之前一的基础上再加,对吧,也就是各自输出各自的,只针对当前的K有效。呃,好,那这个测试代码我们就已经完成了,那这里边再稍微给大家扩展一下,我们这利用这个例子是给大家讲了一下这个呃,Value this的用法,那像其他的那个state又该怎么用呢?快速的过一下啊,就是其他类型。呃,状态的声明前面我们讲过,除了value state之外还有什么啊对,比方说还有list state对吧?List state,大家看到这里面这个list state是不是也有一个泛型啊,这个泛型就是你当前这个列表里面存什么,存什么样的值对吧?啊,比方说我这个list state里边存的是一个string,我这个就叫做呃,My list state,那大家想到后边我在获取状态句柄的时候,这个东西怎么写呢?
18:16
因为你这是指声明了对吧,后面要用是不是我这里边还得去呃定义一下呀,对吧?所以这里边my list set一样的,大家看啊,Get runtime contacts,接下来是get get list state,大家看它弹出来第一个烹饪就是对吧,Get list state,然后你有一个还是value吗?List state script对不对啊,这个就全部对应的都变一下啊,然后这里边同样给一个名称,这个叫my list啊,这个名称是随便给的啊,但是大家千万要记住名称不能相同,你的一个任务里边的这个状态是不是名称不能一样啊,因为这个名称就相当于是我们运行时环境里边的那个变量比一样,对不对,大家想。
19:00
对吧,你如果要是这个名名称重了的话,那直接flink就找不到了,就错乱了啊,所以后边再来一个,这里边给一个类型,直接给in.class它只要诶大家看这里边啊,String是吧,我们要的是string啊string.class直接给一个这个就完事了,对吧?啊这就是这个呃,List state的一个用法啊,那当然了,就是这里边我也可以给大家演示一下。呃,就是,呃,其他状态API调用,那还可以调什么方法呢?呃,就是像这个list state,它也是这个点value吗?大家看一下它里边的方法是直接可以点get,它的读取就是直接get就完事了,得到的是一个。依托类型对吧,可迭代类型啊,那自然接下来你就该怎么操作,怎么操作便利就完事了,对吧?啊,所以它是这个是读取啊啊那。
20:00
当然就是说这里边我们可以把它定义成一个这个strings对吧,呃,就是这这里边本身,呃就是这个做做这个操作的时候啊,你其实是得到的这个类型是一个是一个interable啊后边你如果要做操作的话,呃,大家可以去用不同的那个Java里边的便利方法对吧?ER或者说while对吧,或者说这个呃,其他for循环啊,增强放循环都是可以的,然后另外呢,大家可能会想到这是这是读啊,另外还有写对不对,那如果要往里边写入的话,是不是可以直接update呀啊这是update就是直接。把这个一个list直接更新进去,是不是直接全改啊,哎,这是全改啊,那有时候我们往往这个list不要那么简单粗暴,是不是我可以往后追加呀,哎,大家看可以ADD对不对啊,你就可以一个值一个值去ADD对吧?我这里边给一个比方说ADD哈,对吧?啊,就可以做这样的一个操作啊,这是完全可以做到的,呃,那另外还有一个API单,刚才大家也看到了,可以挨all,对,就是加一个列表一组都加进去对吧?呃,这也是完全可以做到的啊,当然了,现在它它这个这个报错这个并不重要啊,大家可能知道,就是对于我们这个一个list state的这个呃操作而言啊,你其实如果要去把它做这个get的话。
21:23
就直接这么等,可能这个还稍微的这个麻烦一点啊,那大家会想到我是不是直接可以做一个这个for循环,增强for循环,直接去做一个便利啊。比方说我定义这样的一个。一个string对吧,呃,就是for,然后定义这样的一个for循环,后边是不是就可以直接把它这个获取my list set里边啊,Get到的所有的这个值就可以对应的做一个呃,对应的这个处理了,对吧?啊那里边比方说我直接做这个,呃,就是一个一个打印,或者做一个其他的这个方法都是可以的啊诶这里边我们看一下哦,在return后面了,对吧,主要这个问题在这啊。
22:09
我们直接放到最前面来做这个测试吧。啊,那这里边其实你可以直接去做一个这样的一个输出,对吧,直接把这个string做一个输出啊,啊,这都是完全可以的,这是这个list state啊那那另外我们也可以简单的测一下,大家能想到还有别的state对吧,比方说还有那个什么对map。那如果说我这里边直接去创建一个map state,大家看到它的那个泛型了吗?泛型是不是有两个,因为你是k value嘛,对吧,所以当然是两个,所以比方说这里边我们这key是string对吧?Value是一个double,哎,我们这个double类型的一个温度值,比如比方说啊,那我定义成一个my mapt直接定义出来,那同样后边这里边声明的时候我就快速写了啊,大家知道是不是类似的方法呀,Get wrong time,然后get map,你有一个map script2个类型对吧?里边是my map,哎,那问题来了,后面这个class怎么写呢?哎,其实大家直接看一眼的话,你会发现它是不是就是两个类型啊。
23:23
对吧,所以当然就是说呃,这个string.club然后double.class是不是直接这么写就完事了,那同样下边如果我要去做这个呃读写操作的话啊map做这个读写操作的话。My map那家看是不是我可以去,首先可以get呀,对应的那个K可以去做一个get对吧?哎,这个就没准了啊,我不一定里边存了什么东西,然后同样是不是也可以去做put呀,对吧,另外还可以put all对不对?哎,把所有的这个key value都直接存进去啊,比方说这里边这个一个double。
24:06
哎,直接做这样的一个put,这都是可以的啊啊那当然了,这个呃,Map里边还有其他的一些方法,大家看到entries可以拿到什么?哎,对键值对的那个那个呃,序列对吧?呃,那那个那个集合啊,Able类型,另外还可以判断这个contains包含不包含哪个K对吧,另外还可以remove remove,那就是是不是就是把某一个K直接删掉啊啊对吧?所以这里面就是常规的那些map部操作都可以做啊,完全没问题。当然还有另外一个比较特殊的东西,是所谓的那个聚合状态啊,呃,这个我举一个例子,就是reducing state,大家看这里边有一个有一个类型对吧,比方说这个reducing state,我就直接用那个sensor reading吧,呃,这个我叫my reducing state。那这个定义的时候稍微的奇怪一点点啊,大家看一下这个reducing,它定义的时候一样,过程还是类似的啊,大家看get reducing state对吧,然后你有一个reducing state script,看一下这里面传的参数是啥。
25:16
三个参数前面的这个string啊,这是名称对吧,最后的class那是类型,这个好说,中间是不是要传一个reduce function啊啊,所以这里边其实就是你要定义出一个聚合的这个呃过程来,对吧,然后告诉我到底应该怎么聚合,哎,大家可能会想它这个reduce function到底到哪里去调呢?其实是到这里。大家看,就是reduce state也会有一个ADD方法。但大家想呢,这个list这里边可以去ADD,它是直接在后面追加,那reducing it这个A又是干嘛呢?直接去合,对,直接就是调用我们刚才说到的那个reduce function对吧?啊,那所以他传一个这个呃,Sensor reading进来就可以做这个聚合了,好所以这个我们就不详细写了啊,大家大概知道就行了,对吧,我把这个就注掉了啊。
26:13
呃。所以这就是具体的一些过程,而另外还有一个,还有一个特殊的用法是什么呢?所有的状态都有一个方法,叫做clear。就我直接可以把它清空对吧?啊,我可以可以把这个东西直接清掉,就是让我们内存里边释放一些资源啊,不要一直占用着这个资源,这就是所有的这个调用的过程啊,大家可以下来之后测一测state。
我来说两句