00:00
我们已经了解了k state到底是什么,它有哪些基本类型,接下来我们就可以具体在代码里边看一看它到底应该怎么用,怎么去做这样的一个状态编程了。那其实在前面我们讲过的top n和这两个例子里边,我们对于使用k state进行状态编程已经有了初步的了解,我们可以简单的回顾一下,这里边最关键的点在于哪里呢?其实跟我们一般的编程习惯最不同的点。应该就是对于运行时上下文的,这里的这个插入,前面我们介绍过,必须得使用运行时上文才能去获取到当前状态的控制距离,然后才能真正的定义出状态,进行自定义的操作,那所以这里面的关键点其实就在于运行时上下。所以我们会看到啊,对于当前一个flink而言,我怎么样能知道当前这个状态,它要根据不同的K去进行分配,去进行管理呢?哎,那你必须。
01:07
得有跟普通的本地变量不同的定义形式,普通的本地变量如果要定义的话,我们知道在这里啊,直接当前这个类的实例里边,直接去把它的类型,然后名称定义出来就可以了,该怎么赋值怎么赋值操作就可以了。但现在我们的状态不一样,状态必须要基于当前的运行时上下文去进行定义,而这里边的关键呢?就是要传入一个状态的描述器,这个状态的描述器我们会发现它里边给的其实就是状态的名称以及类型,这两个东西跟定义一个普通的本地变量时候,我们想要告诉代码的内容其实是一样的啊,那只不过这里的区别就在于把它包装成一个state script这样的一个描述器,然后呢,可以调用。
02:01
运行是上下文的不同的方法,那比如说我们想要创建一个value state的时候,那就直接get state就可以了,那如果想要创建一个list state的时候,我们调用的是get list state,区别主要就在于这里。所以我们会发现啊,当前最大的区别其实就在于我们首先要通过创建一个描述器来指定当前这个状态它到底叫什么名字,然后类型是什么,相当于要定义这个状态变量,然后呢,具体要获取到这个状态变量的时候,获取到它的控制句柄的时候,必须要依赖于运行时上下文。那运行上下文它的调用是有限制的。我们并不是在任何一个算子里边都可以直接去调用的,那它最常见的调用的地方,之前我们都是在process function里面啊,那我们知道其实它的本质并不是一定要在process方式里面才能调用,我们也可以直接在一个rich方式里面就可以了,对吧?啊,这是所谓的复函数类的一个特点啊,它可以获取到当前的运行值上下。
03:11
那所以接下来我们可以在另外一个新的例子里边去做一个具体的尝试,然后看一看当前的k set跟。一般意义上的本地变量,它的区别到底在哪里啊?这里我们可以去new一个package。这是新的一张CHAPTER09。然后接下来我们去一个,这个我们就直接叫做state。关于状态的一个统一的测试。那首先我们还是先把基本的东西先定义出来吧。Three execution environment。Get,先把它获取出来啊,然后不是一般性,我们还是把这个并行度全局设成一,方便我们后边进行测试,接下来呢,呃,为了方便测试,我们还是把对应的这个数据源定义出来啊,那这里面env我们可以直接source,这里边的话我们直接用前面已经定义好的click就可以了。
04:16
然后我们还是应该要定义出来当前的时间戳和获取水位线的方法,那strategy我们这里边啊,直接还是应用最常见的。处理乱序流的这种方法吧,啊,那当然了,当前我们本身是按照顺序去直接生成的,所以这里的时间的延迟直接给一个zero就可以了啊,那这里需要还需要单独定义的是一个with啊,那这里边我们需要把这个Z。Signer定义出来,这里面我们传入的要的这个类型应该是当然了,前面我们这里边应该把这个加上泛型。'放在这儿。
05:01
接下来我们实现一下里边的extract time,方法非常简单,Element直接拿出来就可以了。这是我们能够想到的定义的这个过程,那前面我们把这个就叫直接叫做stream好了。先把当前的数据源定义好,然后接下来我们简单的做一个测试,那就是直接既然是k stream,那显然是应该要去做一个K啊,那这里面K的话也非常简单,我们直接就拿user。作为K去分组就可以了啊,那接下来我们需要去定义一个操作,定义一个算子,然后在这个算子里边去自定义状态啊,那我们说了,对于这个算子而言啊,所有的算子都可以使用它对应的那个函数类去单独的定义k state啊,所以这里边无所谓了,我们用map flat map这些无状态的算子,本身无状态的算子其实也是可以啊,那所以这里面我们可以直接给一个自定义的。
06:00
MY。Flat map。最后直接可以做一个print打印。勘探结果怎么样?好,最后不要忘记,因为execute,我们把前面的异常抛出,就是整个测试的框架,那当然了,这里的最重要的是实现自定义的rich flatman方式啊。实现自定义的flat map function。用于。He的。State测试。啊,那首先这里边我们需要去。把当前的这个类定义出来,Public static。MY。Light map。Exet需要去。
07:00
继承的是一个抽象类rich map function啊,那当然了,这个rich Fla function有input和in和OUT2个泛型,那我们知道输入的当然是event了。输出为了方便测试的话,那最简单应该是给一个string,这个最方便,所以我们干脆直接把它定义好,然后接下来,哎,那就是要定义状态。我们知道,在定义状态的时候,最简单的方式当然是定义一个了。Value需要有泛型,这里边我们的泛型就直接用这个好了,然后接下来。我们定一个my value state。名字叫做my value,我们知道当前如果要直接定义的话,那应该是在后边直接去。Get runtime con,然后去调get state方法,然后里边去new一个对应的script啊,那就是对应的value state script里边要传入的当然是。
08:09
状态的名称,比方说我这个就叫做MY。State。还有当前状态的类型啊,那当前状态的类型也非常的简单,那就是。就是了。我们直接用event.class把它传进来就可以,这个看起来没有什么问题,但是我们知道。在当前的这个类里边啊,如果说我们定义属性,这是相当于类里边的属性吧,定义属性的时候直接以负出值的状态去调用这个get contact的话,其实是无效的,那在这个时候我们当前其实是静态化的代码。本身当前在执行这段代码的时候,当前的这个类的实例其实是没有创建出来的,所以在这个时候直接调用get time contact,当前的这个运行是上下文,其实是不对的,其实是没有办法直接获取的。
09:05
啊,那怎么样去处理这个问题呢?那其实就是我们前面已经提到过的处理方式,直接可以把它放在open生命周期里边,因为我们知道open生命周期在调用的时候,当然当前的运行是环境是已经就位了,当前,呃,所有的这个状态是当前的任务的实力都已经创建好了,当前的环境都已经就位,我们就可以直接调用get装time contact去处理当前的状态距离。所以这里边是有一个这样的小技巧的,呃,那另外还有一个要求就是说,那我是不是直接在这里边统一去做定义就可以了呢。就是我们想到。按照这样的想法的话,是不是可以直接我们直接把这个open证明方法放在这里,那直接把上面的这一行。复制到这就可以了呢,哎,不是这样的,因为接下来如果说我们还想要用这个state的话,那很明显。
10:08
在其他的一些方法里边,比如说flat map这里面,我们也得用这个my value state这个变量,但是这个变量你如果直接定义在open方法里边的话,显然它的作用力就达不到外面了,诶,那所以我们的处理方式是。在上边。我们在最上边这一行。先做一个声明,先把当前的这个状态先声明出来,而在下边open方法里边再去做一个具体的复制。做一个初值的写入,这就是在使用这个Kate的过程当中的一个基本的方式,也是处理的一个基本的技巧。那接下来我们已经有了这样一个状态,当然接下来就可以用它了,我们可以在必须要实现的这个flat map方法里边去。
11:04
访问和。更新状态。啊,那当然了,就是一开始的时候,如果说我们只是把它定义出来什么都没有,那这个时候状态里边。有没有东西呢?哎,我们也可以看一眼。可以做一个打印。看一看当前里边的东西到底是什么,那另外就是我们可以对它进行一个更新,做一个update操作,叫update方法,里边传入的当然就是我们想要的对应的,呃。对应的一个值了,我们可以把当前一个value来了之后,直接就写入到啊当前的这一个。状态里面去,也就是说每来一个数据,我就更新一下当前的这个状态,那这个状态好像没什么用,只是保存了最新的那条数据而已,但是没关系,我们可以。直接再把它做一个打印输出。
12:02
为了看得更加的清楚,我们可以在前面再加上一个。My value。好,我们现在可以测试一下。我们可以看到初始的时候是,那然后接下来啊,每来一个数据就会更新一下我们当前的这个my value,我们可以把它先关掉啊,那当前这个更新的过程我们会看到,诶,为什么后面又又更新成档了呢?那是因为当前的user不一样,我们看到如果要是user一样的话,诶,那同样是Mary的数据来第二条的时候,那就还是就不会再出现down了,直接value就输出了,那如果要是别的爱丽丝的数据再来呢?诶,那又是先输出一个nu,然后接下来是爱丽的数据去进行一个更新。所以我们可以看到,当前的状态其实是按照去进行了一个划分隔离的,不同的K都有自己的对应的状态。
13:08
所以这也是证明了我们前面。提出的k state,它的一个特点,Flink底层帮我们把这全部搞定了,我们在定义处理操作的过程当中,并没有做其他的更多的判断和呃,更多的定义,我们直接就定义了统一的一个my value state,我们这里边并不判断K到底是什么,只是直接在这个算子后边啊,我们在这个当前的reach map方式里边定义出状态,然后直接去调用它的update的方法就可以。那这里边update的时候,它到底更新的是谁呢?自动flink就帮我们去找当前的value,它对应的K是什么,它就去更新对应的那个状态就可以了。这就是的一个基本的特点。
我来说两句