00:00
我们已经了解了k state的基本用法,那接下来呢,我们就按照不同的状态类型,再结合实际的应用案例来详细的了解一下状态编程到底应该怎么做。那首先呢,我们还是先来回顾一下,在代码当中,如果想要去注册和使用一个k state到底应该怎么做啊?那在前面我们介绍到的代码的使用过程当中,最核心的一点其实就是要创建一个当前状态的描述器。啊,那这个描述器呢?构造方法中主要需要传入两个参数,一个是当前状态的名称,另外一个是类型。啊,那当然了,如果说是reducing state或者是aating state的话,还有可能需要传入一个对应的red方式或者aggregate方式,传入一个函数类指定聚合的方式。只要我们指定了对应的描述器,那就可以以它作为参数传给get runtime contacts的一个get state方法,哎,那对应的这个方法呢,可能是get state,也有可能是get list state,也有可能是get map state,这就跟我们当前所使用的状态类型有关了。
01:10
这里的关键点是在于我们必须要先获取到运行时上下文,有了运行时上下文才可以去对应的注册状态啊,那所以在这个过程当中,我们并不能直接放在类的静态代码块里边去调用这样的方法,所以只能把它放在open生命周期啊,啊,当然如果说我们直接在open生命周期里边去定义对应的状态变量的话,又是不行,因为后边我们可能在Fla map啊,或者map其他的一些方法里边需要去使用当前的状态,哎,那所以我们这里给出的解决方案就是在外边首先先把状态定义出来,把类型指定,先给一个空值默认值,然后接下来呢,在open生命周期里边再去调用get runtime contact对应的get的方法,然后获取到状态的控制距离啊,那这种方式可能会稍微来讲有点麻烦啊,代码实现可能会比较啰嗦,所以有时候呢,我们在。
02:11
SC代码里面可以用一种比较简化的方式啊,这种方式呢,就是使用一个lazy关键字去做一个懒加载的声明。主要就是因为lazy关键字声明的变量,我们可以在它使用的时候才去对它进行求值赋值的这个计算,哎,那所以只有在后边state使用到的时候,这个时候我们才会执行后边的get run contact.get state方法,哎,那这个时候当然我们运行上下文就已经有了啊,所以通过这种方式就可以让代码更加的简洁,不需要在open生命周期里边再去单独声明了。另外我们还需要注意的一点就是关于状态呢,诶,我们之前所介绍的状态可以认为就像一个本地变量一样,放在当前分区的内存里面啊,但事实上啊,状态不一定都存在内存里面,也可以放在磁盘或者其他的地方,那具体放在哪儿呢?这个是可以去配置的啊,那可以配置的这个组件就叫做状态后端stay back。
03:14
那关于状态后端呢,我们会放在后边的章节里面进行详细的介绍。那接下来呢,我们就先来对值状态给出一个具体的应用案例啊,其实前面我们都已经使用过值状态了啊,Value state已经用过很多次了,那在前面我们这个简单的介绍过程当中,我们会发现如果使用了aggregating state,或者哎,我们在这里使用一个map state的话,都可以对当前每一个用户他点击的URL的次数做一个统计。其实我们如果仔细去思考的话,就会发现啊,根本没有必要这么麻烦啊,像之前我们在这个map state里边是以当前用户的用户名作为key进行保存,然后呢,后面值跟着就是当前他的这个点击URL的次数count值,那我们会发现本身k state已经是按照K进行了一个分组,每一份状态在我们这里啊,状态实力都是只针对当前K有效的,所以我们根本没必要再把它指定成一个K一个value这种形式了啊。那另外就是如果说我们用。
04:21
A state的话,在这里我们还需要去指定里边的一个accumulator,然后每来一个数据元素,然后我们就让它加一,最后呢,还有一个get result方法,我们要输出一个统计结果,这个过程很显然我们没有必要这么麻烦,其实这里边我们最关键的就是一个到目前为止的count技术。其实我们就把这个count值作为一个状态保存起来就完了,哎,那所以这个需求完全可以直接用一个value state就完整的实现。当然了,我们如果说使用了状态编程的话,还有另外一个好处就是如果我们不是简单的使用了rich function,而是使用了process function的话。
05:05
P process function里边还可以注册定时器,这样的话我们就可以控制当前统计输出的频次,比如说像像之前我们的这个聚合过程,那就是每来一条数据都会执行一次啊,那我们这里不管是map state也好,还是aggreg state也好,都会输出当前的访问频次聚合出来的抗值。有时候如果我们数据多的话,很显然没必要每条数据都输出嘛,诶那可能我们就隔一段时间去输出一次就可以了,那这个隔一段时间的时间频率怎么样去控制呢?用一个定时器就可以很容易的实现,所以接下来呢,我们就在代码里边去实现一个类似于周期性的统计用户活跃度啊,用户访问URL的频次这样一个需求啊,所以接下来我们可以在CHAPTER09下边去创建一个SC的object。这个需求周期性的统计,用户访问次数,我们直接可以把它叫做pic,我们就直接把它叫做PV吧,这其实是类似于PVP的一个统计啊呃,PV example。
06:11
Main方法先写出来啊,那前面的过程其实都是非常类似的,我们直接照抄k state这一段代码,创建执行环境,设置并行度,然后接下来读取数据源,后面提取时间戳指定watermark的生成策略,后边呢进行按照user进行T,接下来进行处理啊,我们前面不要忘记把这个下划线引入。接下来呢,呃,那其实我们这里就不需要去做这个flat map操作了,因为后边我们周期性的去输出结果的时候,可能用到定时器,那这个时候我们就可以直接调用一个process方法。啊,那里边我们直接扭一下,把这个类定义出来吧,就叫做periodic TV。那上面的话,我们就不需要再引入my flat map这个类了,接下来我们关键就在于实现一个。
07:02
自定义的。Key的process function。Plus。我们直接写出来periodic PV extend key的process方式,这里需要指定的泛型有三个KIO啊,那当前的K的话,User那是string类型,输入的数据类型当然是event事件。最后的输出,哎,我们还是简单一点,直接把它包装成string,打印到控制台输出显示。这里必须要实现的是一个process element的方法,那在这个方法里边呢,我们要做的事情其实就是每来一个数据,把当前的count值加一,那这个count怎么办?我们就直接去定义一个状态。定义值状态,哎,只要一个值,一个state就可以保存当前的抗值。保存。当前用户的PV数据其实就是一个count了,所以这里我们可以用lazy懒加载的这种方式去做一个定义,直接可以把它叫做countate。
08:09
那它的类型的话,当然就是value state。后边我们需要给一个对应的泛型啊,我们保存的其实就是一个长整形的值嘛,哎,所以直接给一个long。后边我们可以get runtime contact,然后get state,里边你一个value state。Script啊,那里边我们需要给一个状态的名称,我们就叫做count吧。再加上类型class of长整型long。好,这样就把它定义出来了,我们可以看到这种写法就相对会简洁一点啊,就省去了在open生命周期里边去调用get context.get state的方法,那直接在外边定义出来就可以了,然后接下来在process element里边,那就是每来一个数据,每来一个数据。
09:01
就将。状态中的count加一。所以我们的逻辑其实非常的简单,首先我们先获取到当前的count值。这个本地变量就叫做count,它是从constant里边点value获取出来的,然后接下来呢,A呢,就直接把constant去update一下,改成count加一更新状态就可以了。啊,那大家可能会觉得奇怪,诶,之前我们在那个map里边不是还要判断一下当前这个到底有没有吗?如果第一条数据的话没有值,诶,那这个时候是不是我们就不能直接加一呢?哎,其实没必要,对于value state而言,假如一开始并没有数据来,没有做过update操作的话,初始的默认值对于长整型的value state而言。初始默认值就是零,就是0L,那所以如果第一条数据来的时候啊,这里的count其实就是零啊,那所以这个逻辑我们就不用改了,直接count加一就可以了啊,那关于count的统计呢,我们这个逻辑就已经搞定了,接下来还得考虑一个事情,就是我们需要周期性的输出结果。
10:11
所以这个时候呢,我们不能直接在这里就alt.collect直接去输出,那这样的话就是每来一条数据都要输出了,那怎么办呢?所以这里我们需要去注册定时器。注册定时器。比如说我们每隔十秒。输出一次统计结果。诶,这样的话,我们输出的频率就不会那么高,整个处理的负载就不会那么大啊,那所以接下来呢,我们这里边就可以考虑去注册定时器了,但这里面又有一个问题,哎,那如果说我们每个数据来了之后,都以它的时间戳去注册一个十秒钟之后的定时器的话。那我们输出的频率不还是每个数据都会输出一次吗?哎,那就没有意义了,所以这里边我们应该是要判断一下,就假如说第一个数据来了,那么我们应该注册一个十秒钟之后的定时器,而他去注册一个,一旦我们已经注册了这个定时器的话,那么接下来的数据来了之后,就不要碰这个定时器了,就等着定时器去触发就可以了。
11:18
后面来的数据都只是把我们的加一加一,不停的加一,等到这个定时器触发的时候,我们再把count值拿出来,输出一个对应的结果。啊,那如果说我们已经输出了一个结果之后,那相当于定时器就已经没有了,就可以清空了,接下来再来的第一个数据,我们就又去注册一个定时器啊,所以在这个过程当中,我们会发现啊,需要再去保存一个信息,就是当前到底有没有注册过定时器。现在啊,初始上来的时候当然是没有注册过,那我们用一个什么样的状态来保存呢?我们自然想到只要用一个布尔类型的变量去保存就可以了啊,初始的时候是false啊,那来了一个数据注册了之后,我们就把它定义成处啊,那另外对于定时器这种状态的保存,在实际应用的时候呢,有这样一个小窍门,一个小技巧,那就是我们可以不定义布尔类型的变量,而是直接保存。
12:16
我们定义值状态。保存注册的定时器时间戳。哎,那其实我们就想到了定时器时间戳,它本身也是一个长整型的value state嘛,那如果要是一开始什么都没有的话,默认它也是零,哎,那所以如果是零时间的一个定时器的话,那就没有意义了,那一上来这个时间就已经过了,那就那就不会去触发去进行计算了。所以这个状态它的默认值也是零啊,那我们就可以判断,如果是零的话,那就相当于当前还没有注册定时器,那如果说它非零的话,大于零的话,那就说明我们已经注册好定时器了啊,所以根据这个也能起到判断的作用,而且这样的话还有一个好处就是后边我们在定时器触发的时候,假如说前面我们有很多个不同时间点的定时器的话,后边我们在on timer里边定时器触发的时候,就可以根据当前的时间是否等于我们所保存状态里边的时间戳来判断要做什么操作。
13:21
诶,那所以这种方式在实际应用当中还是比较常见的啊呃,那这个的话,既然它类型都一样,我们直接可以copy一下上边。com这个定义,只不过给它改一个名字,我们保存的是定时器的时间戳,所以把它叫做timer ts time step啊,那后边注意啊,这里的名字也必须要改,类型可以一样,但是名称不能一样,这就是我们说的啊,这个名称相当于就是变量名,必须改过来啊,我们就叫开TS。所以接下来我们注册定时器的过程呢,就首先先得做一个一判断了,我们得判断一下timer ts state,它里边的值到底有没有,哎,那如果说没有,它是零的话,这个时候。
14:05
我们才去注册,那如果要有的话,那就直接跳过,什么都不做,所以我们判断它是否等于零,那如果等于零的话,现在要注册了,当然就是ctx,调它的timer service,然后register time time注册一个事件时间的定时器,我们这里所用的定时器的时间戳就是当前数据的TIME3,再加上十秒钟。注意,这里我们必须是一个毫秒为单位的时间戳,所以如果是十秒的话,其实是一万十乘以1000啊。然后接下来还要注意,还得去更新状态。更新状态,因为上面这里只是注册了定时器,那我们这里timer ts state这个状态还必须要写入对应的时间戳,下一次来了之后才能判断已经有定时器了,所以接下来我们就是timer ts state做一个update操作啊,里边我们当然还是把value.time3加上1万这个时间戳传入就可以了。
15:06
啊,这就是我们处理的流程,每来一个数据之后要做的操作主要就是这个C加一,然后判断有没有定时器,没有定时器的话就注册一个十秒后。接下来还有一个关键点,那就是什么时候输出结果呢?哎,那是需要在定时器触发的时候去做这件事。定时器。触发。输出当前的统计结果。所以这里我们要重写on timer方法,这里呢,我们现在其实不需要对于时间戳去进行进一步的判断了,因为我们只有一个地方注册定时器只有这一个,哎,那所以当前只要触发,那就一定是十秒钟之前注册的那个定时器,所以接下来我们干脆就直接输出out.collect。字符串类型,所以我们直接做一个字符串拼接,这里边我们输出用户信息啊,用户到底是哪个呢?哎,那我们得用当前的K了,K可以在上下文里边捕获到,哎,那所以当前我们可以写一个Dollar。
16:11
CTx.current k,哦,然后接下来我们说它的活跃度,或者说是PV值,PV值为。后边我们跟上在countate这个状态里边保存的值,直接把这个count值输出就行,哎,所以这个逻辑其实还是非常简单的。啊,那后边呢,我们不要忘记还得需要做一个操作,那就是要做一个状态的清理,清理状态。因为按照我们当前的逻辑,那是来了一个数据之后,第一个数据来了会注册一个定时器,十秒之后触发,那触发之后呢,如果说我们什么都不做,那接下来这个timer ts state里边还保存着之前我们注册的那个时间戳,所以接下来数据到来之后,会判断发现它里边还有值,那就会跳过我们这里注册定时器的逻辑,永远不注册定时器,那接下来就再也不会触发这里的输出了啊。所以为了避免这种情况,我们需要对。
17:12
Timer ts state做一个清理,哎,那清空状态的话,就是前面我们说的调用一个clear方法就可以了。啊,所以接下来我们就是每隔十秒,我们这里就会触发一个定时器的操作,输出当前的统计结果,接下来呢,啊,那就清空状态,发现现在的timer ts state又变成了零,那下一条数据就又会注册一个定时器,十秒之后再去出发,再去输出,这样的话就循环往复,不停的间隔性的输出了。另外我们发现啊,这里我们只清空了timer ts state,那还有另外一个状态com state要不要清空呢?诶,按照我们当前的逻辑呢,其实是不应该清空的,因为相当于我们还是连续不断的统计,统计所有的历史数据,只是输出的频率间隔开了而已。
18:02
哎,那如果说我们这里把count直接清空,那其实这就类似于一个窗口操作了嘛,就相当于是只统计当前这段时间内十秒钟内的所有的数据,接下来呢,又从头开始重新统计,那相当于下一个窗口嘛,所以我们现在不是定义了一个窗口,或者说我们可以理解成是一个全局窗口,然后定时器触发啊,所以接下来我们可以运行一下,看看执行的结果是什么样子。好,现在我们看一眼这个运行的状态。当然了,因为我们现在定义的是十秒钟之后的定时器啊,这个间隔的时间可能会稍微有点长,我们需要等待一下输出啊,我们看到爱丽丝第一个输出的是只有两个值。啊,那每一个用户啊,接下来先先后后都输出了一个统计结果,我们会发现他们并不是同时的,而且呢,间隔也不是十秒,这是为什么呢?这四条数据,这就是因为我们当前定义的是k state,那所以这里边的count state啊和time ts state它们都是独立的。
19:07
互不影响啊,那所以如果爱丽丝的第一条数据来了之后,他就会注册一个第一条数据时间戳十秒钟之后的定时器啊,那可能他就先会出发,那对应的carry如果是第二个来的用户的点击事件的话,那么他就会在后边注册十秒钟的定时器,第二个出发啊,那所以再往后的话,我们就会看到每隔十秒就会有所有用户PV值的一个统计,这个统计值是在不停累加的,在之前的基础上会不停的累加,不停的增大啊,这就是我们关于值状态具体使用的一个应用案例。
我来说两句