00:00
我们现在已经在代码中了解了state的基本用法,整体来看的话,首先就是要先去创建一个。当前状态的描述器,然后我们可以调用在里边的get contact获取当前的运行上下文,然后调取它的get state,呃,可以根据state状态的类型不同调用不同的get state方法,然后获取到当前状态的控制距柄啊,那我们把它在代码当中的具体实现呢?这个方法的调用是在open生命周期里边去调用的啊,因为只有在当前类的实例创建出来之后,我们才能获取到当前的运行上下文。而为了在其他的方法当中去使用定义出来的状态,我们还应该直接把状态在外边定义成一个当前类的属性。所以我们是先在外边做一个声明,然后在open生命周期里调用运行上下文的get state方法。
01:10
传入一个描述器,把当前的状态定义出来,获取到啊,那后面获取到之后就可以自己去调用它对应的方法去做读写操作,那这里还需要说明一点的是,对于所有的状态其实都可以去调用一个。比如我们这里可以将my state后面可以去调用一个clear方法,这就是所谓的清除清空当前的状态,整个状态就相当于回退到了初始定义的时候,没有任何更新之后的痕迹了,回退到初始的状态。这是关于所有状态的使用,那在这个过程当中,我们会发现这种方式定义出来的状态,其实这都是针对kid state而言的,这种方式定义的状态都是state。
02:03
因为只有在kid state这种场景下,我们才需要从运行是上下文里面去获取,然后去根据不同的key把不同的状态定义出对应的实例来,啊,那数据来了之后,那就是会根据K去访问它自己对应的状态,而不会搞混。我们知道如果说是算子状态operator state的话,那其实不需要这么麻烦,我们不需要运行上下文再去进行这么复杂的处理,只要直接把它定义出来,因为相当于它就是一个本地变量啊,跟本地变量没有任何的区别,当前任务的并行子任务,当前算子的并行子任务。所有传递来的数据全部都能够访问到当前的分区状态啊,那这样的话其实就不需要根据K再去做区分,那也就不需要从运行上下文再去做获取了,这个我们在后边介绍到算子状态的时候还会详细做讲解啊,那当然子状态跟这里的本地变量还是有所区别的,那它的区别主要是在于我们还需要考虑怎么样去把它保存到持久化,保存到其他的外部空间,然后如果发生故障的时候还能够把它恢复出来啊,就是容错方面的考虑跟本地变量还是不一样。
03:28
那关于。当前状态的持久化保存以及。发生故障的时候恢复的这一套系统,它其实是由一个可配置的组件来管理的,那这个组件呢,叫做状态后端state back end,关于这一部分内容,我们也会在后边的章节去进一步的讲解。那接下来我们就就针对不同类型的状态给出一些具体的应用实例。首先我们看一看,最简单的当然就是状态value state value state前面我们已经介绍过一个非常简单的用法,就是在实时对账的这个过程当中,我们就用两个直状态value state保存了之前已经到来的某条流里边的事件啊,那这个其实就是非常简单的一个状态的保存。
04:21
那有了前面的基础,我们现在呢,再来考虑一个非常实际的应用场景,那就是关于PV的统,在之前我们进行PVV计的时候,其实都是开了对应的窗口来对它进行统计的啊,那这个窗口相当于一定是要等到一定的时间的时候,然后把当前这段时间内的所有数据统计出来的那个值做一个输出。那有时候我们可能就会想到,呃,在实际应用场景里边,我可能并不希望把所有的数据分成一个一个的窗口去进行统计,我需要的就是整个所有数据的统计结果。
05:07
但是我们知道,如果要是对所有数据进行统计的话,那就相当于不开窗,直接KY之后,后边直接去做聚合,去reduce,去进行进一步的规约聚合,或者是做其他的聚合。那在这个过程当中,其实。就是来一个统计,一次输出一个结果,这个输出的频次就有点儿太高了,每一个输入的数据都会引起我们一次聚合计算,然后在后边做出一次统计输出,如果对应我们之前的work的话,那就是来一个数据,我们后边就会有一个对应的统计结果的变化,那呃,WORD1 word2 word3,一次一次的变化。在有些场景下,我们会发现这是不必要的,因为我们可能只是需要隔一段时间看一看当前到底统计了多少数就可以了,不需要每来一个都汇报一下当前到底进展到了多少啊,那所以这就涉及到一个问题,如果说我们是每隔一段时间想要得到这样一个结果的话,自然能够想到,那就是开一个滚动窗口啊,但是滚动窗口它只能统计自己这段时间内所有的数据,他不会从头开始统计。
06:23
怎么解决这个问题呢?结合我们之前讲过的process function应该就能想到了,其实也非常的简单,我们还是不开直接基于所有的数据做一个统计,只不过在后边的输出,我们不要非踌速的输出,不要每次每个数据来了之后,做完统聚合之后就直接输出,而是设定一个定时器。我们设定一个闹钟。这个定时器可能是在我们想要的那个时间段才进行一次输出,也就是说统计聚合统计是每来一个数据都要做的,做做完了之后呢,并不把当前得到结果发送到下游去,而是等到我们设定的时间到达的时候,才去把对应的结果进行一次输出。
07:19
这样的话,相当于。我们上游的数据可能来得非常的快,非常的多,而下游呢,我们统计的结果就可以每隔一段时间输出一次,下游需要去处理的数据就会少很多了,这会让下游的任务工作量,它的饱和度负担都会小很多啊,这是在实际应用场景当中非常常见的一个需求啊。那我们可以利用这样的一个聚合和定时器触发,隔一段时间输出一次结果这种方式把它做一个实现,那我们自然想到了,既然要用定时器,肯定必须使用process方式,而在这个过程当中,我们应该想到也不需要在前面单独做聚合了。
08:06
在process function当中,我们可以直接定义一个状态。把前边已经到来的所有次数只要保存在这里就可以了。然后。定时器触发的时候,把这个数,把这个状态拿出来做一个输出,这就是我们整体的思路,所以接下来我们可以用一个状态编程来实现周期性计算PV进行输出的这样一个应用。那接下来我们可以在当前的包下边去创建一个新的类。这类我们就叫做周期性的periodic计算PV。的一个事例。那整个的处理流程其实跟前面还是非常类似的,首先我们应该还是需要去创建执行环境,然后设置并行度,然后读取数据源,这个过程跟前面非常的类似,所以我们直接从前面的state test这里把前面的。
09:07
都copy过来。呃,当然前面我们还是应该把。异常抛出。最后有env execute执行起来这个整个的架构都是没有问题的,那所不同的当然就是接下来进行的处理就会有所不同。所以我们现在要统计,当然还是应该有一个标准啊,统计PV的话,那可以可以进行一个分组统计,就是统计每个用户他点击了有多少次,点击访问数据啊,那所以这相当于是统计每个用户的PV。相当于是每个用户的浏览量、访问量。所以在这个过程当中就需要。根据当前用户的ID。或者说username。
10:00
点user。去做一个分组,所以是K当前的U。然后接下来我们需要统计当前用户的他的访问次数,做一个累计,做一个聚合,另外呢,还要隔一段时间进行一次输出,输出这个要用到定时器,所以我们当然就是一个process的应用了。这里。里边。有一个自定义的的process方式啊,这里边我们直接定义一个名称叫。A tvor。那做完之后,我们直接把它做一个打印输出啊,所以如果要这样的话,我们得到的结果处理完成之后的结果就直接转换成一个string进行输出就可以了。为了看得更加清楚一点,我们可以在前面加一个。的打印,这样我们可以看到每来一条数据之后。会有什么样的变化?
11:05
接下来我们的关键就在于实现。自定义的。Key的process方式。我们把这个定义出来,Public static class period。Heavy result。那当前我们需要让他去继承key的方式。这样的一个抽象类,好,那然后接下来这里边的。泛型有三个KIO,我们这里面当前的K的类型当然是string user的类型,那I输入的数据类型是,Event输出的数据类型当然是。这就是我们当前的类型的一个基本的定义,然后接下来里边具体的实现呢,这里边我们发现如果要统计之前所有的个数的话啊,那当前应该是应该要有一个状态保存当前的这个值,所以。
12:11
我们定义状态。保存。当前PV统计值。这个非常简单,只用一个value state就可以了,只有一个值,我们还是把它定义成长整形。叫做count,当然可以加一个state,因为这是当前的一个value类型的变量。那除了这个ATE之外呢,我们会发现后边我们还需要去定义一个定时器,而这个定时器呢,应该是假如说我们这个周期性就是十秒啊,那我们这里面定义就应该是隔十秒钟第一个数据来了之后,十秒钟之后应该要输出一次当前的PV统计值。而第二次输出又是什么时候呢?诶,我们会发现就是第一次输出了之后,那接下来又应该再去接下来来的数据之后,又应该再去定义一个定时器了,所以在这个过程当中,我们应该不停的去定义定时器,那可以简单的定一个规则,就是我可以判断当前如果有已经设置好的定时器的话。
13:21
每一个数据来了之后,我们去调用,调用对应的这个,呃,Process function里边会有一个。Process element的方法,那调用这个方法的时候,我们就不需要去再去注册定时器了,那如果说当前没有定时器注册好的话,那来了一个数据,我就注册一个新的十秒钟之后的定时器,这个就实现起来比较简单一些,所以在这个过程当中,我们还应该有一个变量去判断当前有没有定时器。所以这里边我们保存。以及有没有定时器。
14:01
所以这里面我们还需要除了保存当前的countt值之外,还应该保存一下有没有定时器,那当然了,有没有的话用一个布尔类型就可以直接表示,那另外呢,在处理定时器的过程当中,我们知道定时器本身在process function里边,它的ID就是以时间戳来表示的,所以呃,有时候常用的啊,也会直接保存当前定时器的时间戳。只要这个时间戳存在,就说明当前有定时器,这也是非常容易做判断的啊,那这样的话,我们就直接还是把这个时间戳做一个保存就可以了。把它叫做timers。就是这样的两个状态啊,那接下来我们需要在open生命周期里边对它进行一个状态句柄的获取。Con。这里我们需要调用当前的gettime contact方法在运行时上下里去获取它的状态距离,那这里要一个描述器value statescript里边。
15:12
当然就是对应的名称和类型了,当前的名称啊,一个是当前的count值。类型的话就是长整型。那我们知道后边timer基本上也是一样。只需要把里边的一些关键点做一个修改就可以了,同样它也是一个value state script,同样类型也是law啊,这里关键点在于不能相同,我们这里是timer。He。就是我们关于状态的声明,然后接下来那就是必须要实现的process element方法。当前这个抽象类里边的抽象方法,在这个方法里边每一个数据到来的时候,进行处理的时候,都会调用到它啊,那这里边就是我们的核心逻辑了。
16:03
每来一条数据。那么我们就更新对应的count值。那这里其实不用去判断当前的这条数据。这个event啊,到底是哪个user的访问数据,因为我们现在已经按照user做了分组,直接去访问到的对应的这一个状态,就应该是当前user对应的一个count啊,那所以这里面直接更新,直接叠加就可以了,那呃,我们可以直接把。这个count值先获取到。点调用它的点value方法获取到本身状态里面的值,然后接下来啊,这个就非常简单,就是count。对,他要做一个update,做一个更新啊,那这里面需要做一个判断,如果前面获取到的count本身是none的话,本身是里边什么东西都没有的话,诶,那当前我们其实应该是要把它。
17:07
Update成一跟跟前面的过程是一样的,我们做一个这个三元运算符的一个判断,那如果不是那样的话,那就应该直接是拿count加一来做一个更新,这个流程其实跟之前我们做state test里边的测试是一样的。然后接下来还有一步要注册。定时器。当然注册定时器是有前提的。那就是如果。没有。注册过的话。才去注册定时器,那这个判断的标准当然就是timer state timer ts state里边到底有没有值啊,那所以这里就是加一个if判断。如果har ts state.value获取一下等于none的话,我们才去做接下来的注册。
18:01
用当前的ctx上下文调用它的timer service。然后去做一个当前的注册,注册一个timer事件,基于事件时间定时器,那当然了。我们可以直接就基于当前value的,在它十秒钟之后。注册一个电视机。然后接下来不要忘记把对应的状态要做一个更新,就以我们定义的这一个时间戳。把它更新上去,只要更新之后,接下来它有值了,那当我们这个。前一个定时器还没有被触发,当前这个状态还没有被清空的时候,那么我们接下来到来的数据就不会再走这个逻辑去注册新的定时器了啊。那接下来的十秒钟之内,我们就是等待的过程,那什么时候出发呢?还需要去处理一个。
19:00
On timer方法。等到十秒钟的时候,接下来这里边我们真正触发需要去输出结果了。定时器触发。输出一次。统计结果。啊,那这里我们就是用out点去做一个统计,那这里我们直接就是一个string类型,所以就做一个拼接就好了,把我们想要的信息,那我们先判断一下,就是当前是哪个用户呢?这个用户user名称其实已经在当前的当前的K里边包含了当前的key,就是用户嘛,所以可以在上下文里面去做一个获取。然后加上。我们做一个哦,他的PV。是多少呢?那就是state。状态里边的值拿出来拼接到一起就可以了。这里需要注意的是一次触发之后,那接下来我们就应该把状态清空,进行重新的注册定时器重新去统计了,进行下一轮的统计了啊,那所以这个过程当中,我们其实。
20:12
需要去清空状态。清空状态。所以直接把timer ts state调用一个clear方法清空,那我们可能会想到,那是不是count state也需要去做一个清空呢?这里需要注意,如果把countate也清空的话,那接下来我们的操作其实就跟之前的十秒钟滚动窗口有点类似了,当然又不完全一样,因为我们的时间不是严丝合缝的,我们是基于每来一条数据之后才去定义当前的注册当前的定时器啊,但是整个的逻辑就跟那个类似了,因为只有。当前窗口内的数据我们才会统计到这个count值里面啊,那之前的所有的就都已经清空了,都不算了,我们现在要的并不是这样,我们要的是在之前的基础上去去进行进行一个叠加,所以在这个过程当中,当然count就不应该清空,而只把timer ts state进行一个清空,接下来我们继续注册定时器去做统计可以。
21:16
这就是我们整个的一个处理逻辑。我们本身这个代码的实现呢,它其实应该不能说是真正意义上的周期性统计PV的这样一个过程,因为我们还是依赖数据去注册定时器的,因为有可能比方说一开始我们来了一个的数据。那。接下来是以当当前第一条数据为基准,然后注册一个十秒钟之后的定时器,那接下来呢,我们就直接把它清空了,触发之后就直接清空了,清空之后并没有立即注册第二个定时器,而是要等到再下一条carry的数据到来的时候,才会去注册下一个定时器,所以这个看起来有点像绘画窗口的那种感觉了啊,总是要有这个数据来了之后,才会在十秒钟之后输出一次统计结果。
22:11
那如果说我们想要的并不是这样。我们想要的是类似于。前面之之前说过的。滚动窗口的那种统计形式的话,头连尾,尾联投,第一个周期统计结束之后,马上就开始计时,然后隔十秒钟之后输出下一次的统计结果,如果是这样的需求的话,那其实也很简单,只要把我们这里的逻辑。首先第一次数据来了的时候啊,如果一开始没有注册定时器的话,我们要注册一个,那另外接下来什么时候去注册呢?不要再去判断当前这个数据,按照数据去出发了,我们就不需要去看在数据这里再去判断当前这个定时器有没有了。直接在每一次定时器清空之后,就马上再注册一个新的,这样的话相当于我们这里的if逻辑就只判断一次,只是最初启动的时候来一次就可以,后边每一次都会在清空之后。
23:13
立即进行新的注册,那当然了,这个新的注册就没有value,就给我们提供时间戳了,我们直接以当前的时间戳作为下一个起点就可以了啊,那所以这样的话也会简单很多。只不过需要去增加两行代码,放在这个onver里边去注册下一次的定时器,我们可以再来运行一下,看看现在跟刚才会有什么变化。同样还是一开始,我们需要等十秒钟之后才会看到第一个统计的输出结果,我们可以看到现在应该是Mary啊,所以是四次,前前十秒钟有四次。Mary的访问,这是20,呃,零九这个时间时间点,那后面我们就可以看到,到一九的时候,Mary就一定会输出他的第二次统计,现在是五啊,那么我们已经可以预测到到二九的时候。
24:14
这里就会有Mary的第三次统计输出。类似于滚动窗口,每次都统计,每隔十秒就统计一次当前的个数。这就是关于。周期性统计PV的处理方法具体的实现。
我来说两句