00:00
现在已经知道flink里边状态主要是分为算子状态和键控状态两种方式,那我们前面也已经说了啊,这个算子状态一般情况用的会比较少,它在使用的过程当中呢,也是比较特殊一点,就是它使用的时候看起来就像是一个本地状态,直接用就可以了,但是呢,你要想把它定义成算子状态的话,就必须我们我们说这个外层不是每一个操作都有那个函数类吗?对吧,你这个时候就不能写那个匿名函数,必须要写一个函数类,而且呢,还要让这个函数类去实现一个对应的一个接口,对吧?就就比方说你要去实现一个,比方说可以给大家举个例子啊,有一个接口叫做呃,就是list checkpointed这样一个接口,把这个接口如果实现的话,里边你所使用的所有的本地的状态,呃,它弗link就都默认认为你这是一个算算子状态啊,就当成算子状态来把它保存起。
01:00
来了啊,那这个其实呃,使用起来场景会比较少,我们一般情况,因为都是要分组去做聚合,去做这个开窗对吧,去做一些判断、处理、计算,所以说我们主要给大家介绍这个监控状态的使用啊,它还是比较典型的,然后接下来我们就给大家看一看这个监控状态到底怎么样来用啊,呃,那在使用它的时候呢,必须先要做一个声明啊,所以大家看一下这个声明的语法格式,这稍微有点绕,有点长对吧?呃,整体来看的话,它主要是什么呢?就是我去定义一个这个就是变量名了,对吧,随便定义一个这个变量名,然后后边是它的类型,它的类型是什么呢?大家注意啊,这是一个value state,比方说我定义一个值状态对吧?前面我们讲到有几种不同类型的这个监控状态,那比方说就是一个值状态value state,然后里边大家看还有泛型,因为你刚才只是说它是个值,它是个什么呢?
02:00
什么类型的值呢?弗link也得知道,然后才能给我们给我们做这个状态管理,做序列化反序列化做保存,对吧?啊,所以这里边我给的是一个double类型的value state,比方说我们这里边要保存的就是一个温度值对吧?Double类型value state,然后后边大家看怎么样去定义上运行时上下文里边去定义,因为在当前我们这个分布式执行环境里边,你必须是要在运行时上下文flink才能够去找到当前任务里边啊,就是当前执行的这个任务里边到底有哪个状态对吧?呃,这并不是我们当前在啊,就是照va虚拟机里边指定的一块内存地址一指定拿出来就完了,它必须是在这个运行是上下文里边,弗link才能够真正的找到,这跟我们执行的这个运行时有关系,所以说接下来要怎么样呢?呃,在运行上下文里边,大家看到有一个点get state方法,然后指定它的。
03:00
Double里边要传一个一个参数,这个参数比较特别,又是一长串,它是一个state的描述器script,当前因为是一个value state,所以我们传的就是一个value state script,然后要指定它的泛型,然后里边呢,这个script也是有两个参数构造,构造方法里面有两个参数,一个是哎,大家看字符串,这就是一个当前这个名称吧,类似于变量名一样,对吧,当前状态的名称,另外还有一个是当前状态的类型啊,所以这就是一个完整的一个定义,对吧,就是就是要指明当前这个状态到底它的名字叫什么,然后类型是什么,这个类型还是层层包裹,Flink那边底层我们监控状态是有值类型,列表类型,Map类型,对吧?啊,这这些我们都要分别去定义,当前是一个value state,然后呢,它里边的类型又是一个double,所以这些都要一层一层定义好。然后具体获取这个状态的控制句柄的时候,怎么样获取呢?是要从运行时上下文去做获取,好,那这个运行上下文在这儿大家就想到了,我们在哪里见到过这个东西啊,啊,在哪里见到过这个运行上下文呢?啊就是之前我们其实讲到那个函数函数类的时候,给大家就提到过,当时我们讲到有一个比较复杂的,复杂一点的函数类,能实现更多功能的函数类,叫做负函数,对吧,负函数类,那么这个reach方的这样一套接口,它里边就可以获取到当前的一些生命周期方法,另外还能够获取到运行时上下文。
04:46
所以这里边我们大家其实就明确了,要做状态编程,声明监控状态怎么样去做呢?那就必须要在一个reach方式里边,负函数里边才能够去,这样能获取到运行上下文,才能够去获取定声声明这个状态,对吧?获取状态距离,所以说啊,这就有一个前提的限制,我们必须这么去用啊,所以复函数啊,它的一个主要用途其实就是在状态编程里边使用啊,那这里边如果定义之后呢?啊,它就比较简单了,其实整体来讲这个状态嘛,我们就是一个是读,一个是写,那怎么样去读呢?当前的这个value state,它有一个方法叫做点value,你看我们定义出来这个是value state,对吧。
05:31
我直接点value,就是把这个状态里面的值读出来,我可以再付给另外一个变量,对吧?啊,这个是完全没有问题的,然后这里面如果要是改写这个值怎么怎么怎么改写这个状态的值呢?哎,就是点update对吧,做一个赋值,把一个新的值写进去就完事了,所以这个操作整体来讲还是比较简单的啊啊那这里面我们还是在代码里边给大家实际来做一做操作,大家看一下到底是怎么回事,好接下来我们同样还是在这个API test下边新建一个。
06:04
Object啊,当前这个我们主要是做状态编程,所以是state test。这个状态编程啊,前面的这个过程我们就不细说了,都差不多对吧,我这里边就直接按照前面定义的这个过程啊,该该直接做的事情啊,直接全抄过来完事啊,然后在当前我们做这个处理的过程当中,其实这个跟事件时间就没什么关系了,对吧,大家看这个状态嘛,它它跟这个算子有关系,至于你要处理这个事件时间还是这个,呃,就是我们所说的这个处理时间或者3TIME,其实都没关系,都无所谓定义什么都行,所以这里面我们干脆就把这个去掉了啊。直接把这个去掉好了啊,然后接下来我们就直接处理的是,哎,我我接下来还是就是流式输入对吧,起一个NC去读取数据,然后接下来先把它转换成一个啊,Data stream啊,然后后边大家不要忘记这个整体的架构我们先写出来啊,这个就是执行起来env execute啊,然后后边我们可能就是基于它可以去做一些转换操作,做一些这个呃,所谓的这个状态编程的定义,那我们说这个状态编程它必须要有一个rich方对吧?啊,那所以这里边我们可以给大家举一个例子,比方说啊,这这个不是在这儿定义的啊,我们就自己去实现一个rich function,比方说我随便给一个啊呃呃,这个我们叫一个my rich map吧,Extend,一个rich map function式,大家还记得之前我们讲过的这个函数类对吧?呃,负函数函数类。
07:49
然后这个map操作的话,里边应该要传两个型,一个是输入,一个是输出,对吧,比方说当前我就是3READING输入,然后呢啊随便给一个输出。
08:02
然后接下来在这个里边大家看我必须要实现这样的一个,呃,麦克方法对吧?啊,那这个我就随便写了啊,比方说我就把直接这个ID输出完事了啊,先让他不报错,然后我们关键是看,假如里边我想要定义一些状态,那怎么去定义呢?诶,大家就会看到了,我们本来说的这个map啊,Map本身是不是应该是一个无状态的算子啊,对吧,那就是这里面本来你就是来一个数据输出一个数据,来一个数据输出一个数据,根本没有涉及到其他的一些变量和一些状态的读取啊,那这里面如果说我真的想要给它设置状态,其实也是可以设置状态的,那就是你不能直接用普通的map function了,必须要实现一个rich map function,然后接下来我们在这里边定义状态的时候,怎么样去定义呢?啊,这就是我们说的,你可以在外边,比方说这里边我们定义一个这个呃,Value state对吧?啊,我就这个我这一个,然后这个就是那个度对吧,就是一个值嘛,定一个double类型的value state,然后大家看它要去获取这个上下文的时候,怎么样去获取呢?啊,就是get runtime context,因为现在。
09:21
在这个reach function里面有这个方法对吧?然后里边我们看到它有各种各样的获取这个状态的方式,我们这里边要的就是get state value state的话,直接用这个get state就可以了,里边它提示我们了,要传一个对应的啊,当前一个value state script对吧?把这里边我们就new一个value state script啊,挺长一串啊,然后直接这个double泛型给我们写在这儿了,里边呢,传两个参数,当时给大家说的这个构造方法里边要一个name和一个呃,这里边还有一个default value,大家看到了对吧?啊,但是现在这个比较不推荐大家用这个default value了啊,就是建议大家还是另外再去赋值,就是你如果想要给一个默认初始值的话,可以在这儿赋,那大家可能想到了,那如果我不负初始值,初始值的话,它的初始值是什么呢?哎,就是我们说的这个初始的那个空矢,就像CT拉里边定义的一样,对吧,如果是初始。
10:21
类型的话,给零,你当前double0.0对吧,如果要是一个这个引用类型对象类型的话,那就是空闹对吧?啊,这个其实都是一样的这个概念啊,我们这里边调用它的下边这个最简单的这个方法,直接就给一个name,然后给一个class type就完事了啊,那所以这里边我可以直接给一个比方说当前这个是value state对吧?啊,然后后边给一个class of,当前的这个double传进来啊,这就是一个对于监控状态的定义对吧?所以大家看一下啊,呃,就是state。
11:01
测试必须定义在reach方式。中因为需要运行时,上下的。哎,这个其实是非常容易想到的一个一个规则,对吧?另外还有一个就是其实之前我们给大家提到过,就是说这里边这个get runtime contact的,其实你不能乱调,就是你在这里边,就是我们在这个类外边啊,类构造的时候,直接去调这个get runtime contact其实是不生效的,就它是它是拿不到当前的这个运营上下文的,为什么呢?因为我们当前这是一个任务,对吧,它必须要在这个类真正实例化的时候,对吧,就是任务直接创建出来的时候,也就是至少要等到什么时候呢?我们说的那个生命周期,Open生命周期里边,你再去调它的运行上下文,这个才是这个才是可行的,好,所以说这里边其实是有这样一个问题的啊,啊,那在这里边我们就把这个要包装在,就是正常来讲啊,我们把它包装在open生命周期里边,然后里边呢,再做一个这样的一个实现,这相当于是用上下文运行上下文。
12:20
获取当前的这个状态句柄对吧?然后呃,定义好它的这个名称和当前的这个类型是什么,这样就可以了啊,但是这样一定义,大家又发现又有一个问题啊,你这里边定义在open生命周期了,那我后边在map里边,如果我还想用这个状态的话,就没法用了,你这里边如果要是这个要要这个value state的话,这显然没有啊对吧,它的作用欲你已经被限限定在我们当前的这个open生命周期里了啊,有的同学说你不是说这个value state是,呃,它的那个作用域是当前所有的那个数据来了之后都能访问吧,怎么这里边又有这个什么作用域限定在open生命周期了呢?啊,这个大家不一样,对吧?我们说的这个作用于这个访问的就是可可见的范围,就是对于这个state而言是什么呢?是你在这个运行时加,呃,运行时环境里边去get state的时候,那访问的是什么呢?就是当前自己你已经做了这个kba之后啊,前提是做了KPI之后那。
13:20
访问的就是当前自己对应的K,当前的呃,对应的那一个呃,当前的那个上下文,对吧,就只有是当前的数据才能访问它,就是只能做当前的数据才能做这样的一个一个判断啊,是这样的一个过程,但这里边呢,你定义了一个本地变量对吧?但你看这里边的这个value c这一个本地变量啊,它的作用域那是我们这个SC代码决定的,你不能违反这个scla语法呀,啊这里边肯定你在open里边定义,这里边就不能用了,所以假如说啊。假如说我们要想在这个下边还要能用到它的话,那是不是就必须是把它声明在上边啊,对吧,上边声明的话就不能是一个value类型了,必须是一个可变类型对吧?我把它按照类型定义出来,然后后边给一个空值,然后在后边再做一个负值操作。
14:18
就可以有这样的一个操作,对吧?啊,这这样去做的话,就稍微呃,就就能把我们这个想到的这些问题就都解决了啊,所以就是你得有一个这样的一个思路啊,怎么样去做这个操作,那另外大家看到了还有另外一种写法,这就是我们在这个PPT里边给大家看到这种写法,你可以怎么样去定义呢?啊,有一个偷懒的方法,可以用一个lazy,为什么呢?啊,就是如果要是lazy的话,大家知道就是相当于这里边我只是把它声明出来,是在用到它的时候,后边我才去做赋值,才才调用后面这个赋值语句对吧,才去做执行,所以真正要用到它的时候,那是不是肯定已经是后面代码执行的过程当中了啊,那你这个时候再去get状态,肯定就可以,这个上下文就可以获取到了嘛,啊,所以另外有一种方式是用这种lazy的方式去定义啊啊,大家看怎么样去实现方便吧,我这里面用的是这种方式啊啊,然后下边我们去做这个处理啊,去做这个呃,就是。
15:18
使用的时候I value类就可以直接拿了,对吧?比方说这里边大家看它有两个方法,一个叫点value,这是读取数据啊,那比方说我这里边就是把这个读出来对吧?啊,比方说我说这个叫叫MY啊,Value是吧,把它读出来啊,因为为什么这里边本身它还是一个这样的一个引用类型,你如果直接对它做操作的话,这个是没法做操作的,对吧,只能是按照flink给我们包装好的这个接口,你把这个值拿出来再去做转换,这就是一个double类型的数,你随便想做什么都可以了,好,那如果你想要更改这个值的话,那就是我们说的value.value对吧,呃呃呃,点点update,刚才大家看到了一个value方法,一个update方法啊,那这里边如果说我要把当前的这个当前的这个value传进来这个数据,这个value是这个数据啊,Value的当前的temperature给到他的话,也可以做这个操作啊,所以这个就下来之后,大家可以自己去做测试了啊,就是这里。
16:18
边可以是有这个状态的啊,读写啊,那除了这一个value state之外,大家可能觉得这个value state,呃,太简单了是吧?那另外我们还可以剩下的这个方法,我就用这个lazy方式定义了啊,不在那个open生命周期里边再去定义了,我还可以定义什么呢?哎,那这里边我用laz的方式,我还可以比方说定义一个list state啊,那这个list state,当然它本身flink给我们定义的这个类型就应该是一个list state,对吧?把这个引入,然后这个list state呢,也有泛型,哎,你这个list列表里边就像我们的list一样,对吧?你里边的这个数据类型到底是个什么类型呢?那你还可以去定义,比方说我定义一个int啊,里边还是啊,这里边要做操作,怎么样做操作呢?Get wrongtime contact,对吧,我在这因为是lazy嘛,就直接定义了啊呃,然后这里边里边就是直接去调一个get,大家看有一个方法,有一个叫做get list state对吧。
17:18
直接这么去把它定义出来就可以啊啊,这里边写错了,前面少了一个value对吧,要不然这里边并不知道你这lazy要修饰什么啊,然后里边的定义呢,一模一样,这里边要传的是个什么?就是一个list state script非常类似对吧?大家看这个就几乎一模一样啊啊,所以我们这里边给一个list state script啊,然后里边同样还是给名称对吧?这个叫list state,这里大家要注意啊,这个名称和其他地方我们当前这段代码里面定义的这个状态的名称绝对不能一样,为什么呢?因为这个是flink给我们做状态管理的时候,相当于它是在这个上下文里边的那个变量名。
18:05
它就是靠这个变量名去找我们在啊slot里边具体存储的位置的,所以你不能给一个同样的名对吧?就好比你在scale代码里边,你给一个同样的变量名,这肯定就冲突了嘛,啊,所以这里边的名称也是一样的,这个绝对不能一样啊啊那这里边同样plus of,大家看这里边的类型传什么呢?类型传的还是就是泛型是什么,这里就传什么,因为你外边是list state,我已经定义好了嘛,对吧,我这里边传的就是一个list state的描述器啊,这就没有任何的问题,然后下边如果说我们想要去读写这个状态的话,怎么去读呢?哎,这里边就是list state啊,它的方法直接给大家看了三个方法,一个叫做ADD啊,ADD的话非常简单,就是要把一个数要加进去,对吧,比方说我ADD一个一追加到这个例子里面,那或者还有什么呢,还有就是这个ADD2对吧,这个ADD2就是要传什么呢?传一个列表进来,然后直接就。
19:05
相当于是把这一组列表里面的数全进来了,对吧?啊,那这里面大家看到他要的是一个的这样啊。所以这里边我们得去你有一个Java的list对吧,你得有这样的一个具体的实现啊,那比方说这里边我们要一个这个int类型对吧?呃,然后里边比方说这个二三,呃你呃,当然这里边就是我们去拗拗它的时候就就没有办法直接去这么去给了,对吧,你肯定是前面先把这个类型先定义出来啊,给一个这个类型,然后这是一个空空的这个数组,所以说一般情况我们不会这么去干啊,那我们这里边比方说定义一个list,然后先把它定义出来,我们一般你这个可变类型吧,肯定都是一个一个去往后追加,对吧,然后list在ADD3,然后我们可以直接把这个list传进来啊,这样去操作也是完全可以的啊,那当然了,里后边大家看到还有一个这个,呃,State有一个操作叫做update,对吧,Update的话就是更新,更新,它里边存的是一个list,意思就是说整个这个列表我都不要了。
20:17
就以新进来的这个list不是追加在后边,而是直接把它替换掉了,对吧?那这个就是大家下来之后看一看,测一测,就知道它到底是怎么样去执行了啊,那有些同学可能还会想到,那这里边如果说啊,我这个这个list state就想去做一个获取当前的这个值,那应该怎么办呢?哎,大家看有一个get方法对不对,Get方法就是获取我当前这个整个list的这个值,得到的这个什么呢?一个1TOABLE类型对吧?可迭代的类型,所以接下来你就可以用for循环去遍利当前的这一个列表了啊,所以这都是非常简单的一些基本的操作啊,大家就是下来之后自己试一试就知道了,那另外再给大家稍微的说一下,还有这个对应的啊map state对吧,这个定义就完全类似,我快速的给大家敲一下,这里边类型大家看到了啊,有两个对不对,呃,因为你当前是k value嘛,所以就是一个K一个value啊,比方说我定义的是string ID对吧。
21:17
和这个double温度值的一个这样的一个键值,对啊,那后边我再get runtime contact,然后这这个就要get一个,这个就要get一个map state了,对吧?与之对应的同样的是啊,然后里边你有一个map statescript,然后里边这里边又有类型,那这个类型传几个呢?当然还是两个对吧,UKUV嘛啊,所以这里边同样还是把这个抄过来,String,然后double对吧?哎,那后边这这里大家可能会觉得有点奇怪,就是我这里边给了一个名map,然后后面这个类型怎么给呢?难道还是逗号分割吗?诶,不是的啊,这里面大家简单看一眼就会发现你现在传三个参数就完了对吧?传一个这个当前UK的类型,再传一个这个UV的类型啊,所以说其实就是参数后边又多了一个而已啊。
22:17
那就是class of string对吧?这是KUK的类型,Class of,呃,这个double。哎,这样就把它定义好了啊,所以这个整体来讲也没什么难度对吧?就就是同样的这个调用,然后后边我们做这个操作,它的API怎么样去用呢?诶,这里面的这呃,Map啊,它的这个操作也非常简单,有这么几种不同的,这个就就跟我们那个map使用差不多,对吧?大家熟悉这个sky skyla里边的那个map集合操作的话,你会发现contains,呃,包含不包含当前的这个K对吧?呃,就是当前里边有没有这个K啊,你去判断一下,那这里边还有就是我们最关心的其实是它的那个get set操作,对吧?当然有get了,呃,如果有的话,我们把这个三一拿出来对吧?这里边拿到的它对应的那个那个value,而不是键值,对,对吧,拿到的直接就是value,然后另外还有这个map,如果想要去更新的话,那当然是put了啊,这里边put也是非常经典的一个操作是吧,Lab里边put的这个操作,那这里面你就是比方说我们这个。
23:27
呃,331,然后后边你要给一个当前的这个value值对吧?呃,随便给一个给一个值,Double类型的值给进去,那另外我们看一下这个操作还有什么呢?啊,当然了还可以获取它的key和entriess对吧?啊还有这个values,当然这些东西都是可以直接去拿到的,另外还有remove可以删掉里面的某一个纸啊这些就是大家一看这个就很明显都能看到对吧?另外你可以调它那个迭代器啊,就是然后就可以做迭代,那en大家知道得到的就是一个一个PY6对的一个一个迭代器对吧?每一个en就是一个PY6对啊,所以这个其实就跟我们便利那个set是一模一样的啊,我就不给大家一一去写了,大家下来之后挨个去测吧,好,那另外还有一个,我们前面讲到一个比较特殊的这个类型,这里给大家举一个例子啊,比方说我举一个这个reduce state,那这个reduce state稍微有一点有一点奇怪,对吧,大家可能对它不太理解啊。
24:27
Reducing state这个叫它在定义的时候,你会看到它只有一个类型啊,为什么呢?因为啊大家看它继承自这个mor state是吧?啊因为它是什么呢?里边我们说reduce做操作的时候不就是类型必须一致吗?所以这里边也是一样的,对吧?啊那所以这里边我就是定义一个Sen reading,比方说啊,就要求这样的一个类型,后边完全一样,呃,大家看接下来我就是get reducing state对吧?你有一个reducing state script都放在这儿了啊,然后里边给一个reduc state名称,诶后边这个问题就来了,大家可能会想到里边你如果要传的话,那是不是至少得传一个tap对吧?啊,这个是肯定要传的。另外大家注意还有一个我们可以看一下啊,它这里边重写的方法都是这样的,都是三个参数。
25:20
啊,最后就是不管怎么样,就是一个type的定义啊,或者是用那个type information啊,然后中间必须要给一个对吧?啊,那这个reduce function,这不就是我们所说的这个你自定义的那个reduce function吗?对吧,大家看这个这里边底层去调的话,这个reduce function是啥?这不就还是我们之前给大家看过的。就是我们当时在做那个窗口聚合,或者说这个KPI之后分组聚合的时候,调的flink a API common下面的这个reduce function嘛,对吧,就是这个聚合函数好,所以这里边其实是一样的啊,它底层做的操作其实是一模一样的啊,所以这里边我可以用一个我之前是不是做过那个reducer啊,对吧,我随便给一个reducer放在这儿对吧,后边然后class of,呃,当前这个sensor reading这样的话就没问题了,好,所以它的这个操作其实就是每来了一个元素之后,直接在后边去聚合,调用我们这里边定义的这个reduce function,直接去做聚合就完事了。
26:21
啊,那这里面还涉及到一个这个red function的reducing state的一个一个获取和一个转换的,呃,做一个这个更更新的一个过程,对吧?Get set这个操作,那同样它也有get对吧?这个get出来就是前的这个3READING,这是哪个值呢?就是当前聚合完成的那一个值,对吧?因为我们每次聚合得到的还是一个三次reading嘛,啊其实就是这样的一个值,那这里边它还有一个方法叫什么呢?大家看到还有一个方法叫做ADD啊呃,有同学可能觉得,诶,你这个ADD的方法这不跟这个例子的一样吗?呃,是看起来一样,呃,但是呢,他最后的操作不是追加到这个列表后边,而是把当前新输入的一个sensor reading直接就跟之前的状态聚合起来了,做了一个调用内容use方式,做了一次聚合,这就是这个我们对于呃状态啊,在代码里边的一个使用,我们可以下来之后好好的把它。
27:21
都练一练,试一试。
我来说两句