00:00
我们已经介绍了value state和list state的具体应用案例,那接下来要介绍的呢,是映射状态map state。关于map state呢,其实之前我们在整体介绍k state的时候,也举过一个非常简单的例子啊,那当时呢,我们是以user作为K,然后统计每一个用户访问页面的次数啊,就是每来一个用户的点击访问事件,我们就count值加一啊,所以这个需求非常简单,不过后面我们也提到了啊,这个需求其实根本没必要用一个map来实现。因为我们在这里做Fla map的时候,在他之前就已经做过了KBY,就是按照用户user做了一个分组,那接下来我们在后边所创建的所有的状态都是kid state,它的作用范围就是当前的用户,当前的user对应的数据,哎,那所以我们直接在后面看统计,统计出来的肯定就是唯一的这个用户对应的访问频次了,那其实就没有必要保存成一个web,只要用一个value state就可以统计了啊,所以后来我们也提到啊,Periodic PV这个例子里边,我们就直接用了一个value。
01:12
Comtate直接统计这个个数就可以,那我们当然想到了,那什么情况下就非得用mapt state再把它的key value做一个分配呢?啊,那如果说我们考虑到想要实现类似于窗口功能的时候。那这个时候基于每一个K,每一个用户,诶,那接下来呢,他的count值还应该按照不同的窗口去进行分配啊,所以这样的话我们就可以用一个map来做一个保存啊,所以接下来我们实现的这个需求啊,是类似于模拟一个窗口的功能啊,就模拟一个滚动窗口,而现在我们的需求要求是统计每一个URL在每一个窗口中被浏览的次数。也就是之前我们所做的滚动窗口里边统计PV这样的一个需求啊,那之前呢,我们是可以直接开窗嘛,现在不开窗。
02:09
而是使用map state来进行实现啊,那这里既然是要用到map state,那我们会想到就是以每一个窗口它的起始时间或者结束时间啊,代表当前这个窗口我们作为一个K来进行一个保存,哎,那value的话当然就是它的抗值了,统计出来的在当前窗口内访问的次数。那还有另外一个问题,就是什么时候输出这个窗口的统计结果呢?啊,很显然,我们接下来就还需要去注册一个定时器,一个timer。按照窗口的结束时间注册定时器,到时间的时候把对应窗口的count值拿出来,直接输出结果就可以了,所以我们完全就可以利用map state结合定时器来实现一个窗口的功能,所以接下来我们就在代码里边去做一个具体的实现。
03:02
啊,那这个案例的话,我们还是新建一个SC的object,因为是模拟了一个滚动窗口,我们干脆就叫fake window example吧。是一个假的窗口啊。那同样的前面还是一样。首先获取当前的执行环境。叫做env,上面下划线要先引入全局的并行度,先设成一,方便后边我们进行测试。然后接下来env直接就可以ADD source,我们还是直接用click source测试数据源,然后接下来我们需要去提取时间戳,生成水位线。指定time STEM,接下来我们当然是K,现在是要统计每一个页面的PV值,那当然分组的字段就是URL了。然后我们既然是需要使用状态编程,用到map state,还得注册定时器去模拟窗口,那很显然只要使用定时器的话,那肯定就是process function啊,Key process function,所以接下来是process。
04:11
这里边我们需要去自定义实现一个key process方式啊,那我们就把它叫做fake window吧。后边啊,我们就直接把结果做一个打印输出,最后不要忘记执行起来,这就是我们整个代码的基本的架构,然后接下来那就是实现自定义的key的process方式。Plus。Fake window extend key的process function,那本身key process function有三个泛型参数,哎,那就是KIO,现在的K本身是URL,当然就是字符串类型了,Input是event类型。输出的话,呃,我们还是简单起见,直接包装成一个字符串打印输出,输出一行信息,到底是哪个页面,URL是什么,在哪个窗口里边统计出来它的PV值、count值到底是多少,就这样的一句话打印出来。
05:14
啊,那接下来呢,我们就是具体的处理流程,必须要实现一个process element的方法,那在这之前呢,我们需要先去定义map要初始化一个状态了。定义一个映射状态。用来保存。每个窗口。的PV值。啊,那所以我们还是用lazy的这种定义方式啊,这样稍微的简单一点,代码会少一些啊,我们就把它叫做window PV。State。接下来我们就直接去get runtime contact,然后get map state里边我们要去new一个map state。
06:03
啊,那里边需要有两个泛型UKUV啊,那我们现在K的类型当然就是现在的窗口信息,窗口信息我们用一个window的窗口的start。时间戳或者是and时间戳都是可以的,就是一个长整型的值,另外还有一个当前的count值,那也应该是长整形了。都是浪,然后里边需要给一个名称,那这个名字我们就叫做window PV吧。然后另外还需要有对应的类型信息,Class of love。Class of law。好,把它定义好。然后接下来就是process element,在这里呢,每一个数据来了之后,我们接下来其实就是要类似的模拟窗口对应的操作了,那我们其实知道首先应该得有窗口分配器啊,我们得知道当前的数据到底是属于哪个窗口里面,诶,那这个到底应该怎么算呢?那很显然是要先提取这个数据event里边的时间戳time,然后看这个time到底落在了哪个窗口里边。
07:12
所以在这里我们其实会发现啊,我们还缺了一个非常重要的参数,就是从外边应该传进来的一个参数,诶,那就是我们说的,既然要定义滚动窗口嘛,那应该得有滚动窗口的大小才对啊,诶,那所以这里边我们直接用这个这window的时候,这个类本身可以带一个参数啊,这个参数就是一个当前的size。啊,那这个size的话,我们也是以毫秒做单位的一个。长整形的值,把它定义成这样吧,所以这里边如果是比方说我们定义一个十秒的滚动窗口,那就是直接给一个疑问,这就是十秒钟的滚动窗口。我们这这里是模拟一个十秒的滚动窗口。
08:03
所以接下来我们既然有了这个size,那其实就知道了当前到来的数据事件里边,它的时间戳直接除以这个size,哎,那其实就是从时间戳零开始到现在一共有多少个窗口啊?那如果说想要取当前这个窗口的起始位置的话,那其实再乘以这个size不就可以了吗?啊,我们把这个写出来,大家可以看一下这个操作到底是怎么做的啊,那其实就是我们想要得到,首先我们需要去。计算当前数据。落入的。窗口起始。时间戳。哎,那这个计算方法呢,我们把可以把这个直接叫做start。那就是直接基于value.time step,让它除以S。那除了之后,比方说我们现在是十秒,那一除,那就是从零开始到当前事件,它里边的时间戳这个时间点为止,一共有多少个十秒啊,这里面到底有多少个十秒啊,那其实有多少个十秒,那很显然就是有多少个十秒的窗口啊。
09:19
做这个整数除法的话,除下来是一个整数值,那这个窗口它的起始点时间出到底是多少呢?这个时候我们再乘以S,再乘回来,这就是不包含这个时间戳带着的那个偏移量带着的那个零头的当前窗口的起始点了。所以这种方法也算是一个小技巧啊,要计算一个窗口的起始位置的时候,我们可以直接利用整数除法的这个特性,直接除以当前窗口的大小,然后再乘回来,这就是起始位置。好,那有了起始位置之后,那结束位置当然也就知道了,我们可以直接叫做and,那就是start。
10:01
加上size,那当然就是结束位置了,有了起始点和结束点,那其实这个窗口的主要信息就有了。另外呢,我们为了触发窗口计算,还需要注册一个定时器。用来。触发窗口计算。那所以这个定时器的话,C TX time service还是注册一个事件时间定时器,具体它的触发的时间点,这个时间戳按照窗口的定义就应该是N减一。这主要还是模拟了之前我们所看到的啊,Even time trigger里边所实现的那样,它所注册的定时器其实是max time step啊,就是结束时间减一毫秒的那个时间点啊,这样的话,当到达这个时间点的时候,之前所有的数据就都到齐了,诶这是我们注册定时器的这个过程,然后接下来呢?哎,那既然这个窗口也有了,定时器也有了,那就要按照数据到来之后,我们就应该更新对应窗口里边的count值了,所以接下来就是。
11:12
更新状态。Count加一。那这里当然还分成两种情况,那之前我们也做过类似的这个实现啊,那就是看这个map state里边到底包含不包含对应的K,也就是说,比方说我们以这个start作为K去保存窗口里边的count值的话,那就是看这个start到底在不在里边,如果在里边的话,哎,那就直接拿出来加一,如果不在里边的话,说明之前这个窗口的数据从来没来过,这是这个窗口的第一个数据,那就直接把count值负一写进去就可以了。啊,所以接下来这个也很简单,If做一个判断,那就是window PV map,从里边判断一下是否contain。Start以它作为K来进行count值的保存啊,那这里面我们可以计算一下啊,当前的PV值到底是多少,从里边先做一个get。
12:09
Start是K先拿出来,那拿出来之后接下来呢,当然就还是要做一个put,做一个更新的操作了啊,那同样还是start。然后把PV加一更新进去就可以了,那else。如果说本来就不存在对应的这个值的话,哎,那也没什么好说了,直接把对应的窗口start,点对应的那个位置去put一个一就可以了啊,那这就是我们整体的处理逻辑。那关于每个数据到来之后,我们要做的操作就是这些了,那接下来还有一个操作要做的,当然就是定时器触发的时候应该怎么做。定时器触发时。窗口输出结果,那其实输出的就是。
13:01
当前窗口对应的那个count值直接从map里边拿出来就可以了。这个时候的关键点还是首先先得。找到对应的K到底是多少,得知道这个窗口的起始时间是多少,那怎么样去反推呢?哎,那就是根据当前触发定时器的时间戳做一个反推,因为这个时间戳我们当时注册的时候是按减一嘛,哎,所以那我们自然就知道了。现在对应的start,那就应该是time stamp加上一,这就是and,然后再减去size窗口的长度啊,那这就是当前窗口的起始位置start,也就是我们所保存的键值对里边的K啊,那接下来就以它为K,把对应的count值拿出来就可以了,那所以这里边PV或者叫count值就等于从状态里边去做一个get提取。把start对应的count值拿出来,接下来我们就可以直接输出了out.collect啊,那这里我们就可以写这么一句话做一个输出啊,呃,就是哪一个页面URL是什么呢?啊,那这里我们可以直接用,因为URL是key嘛,我们可以直接用ctx get current key把当前的URL先提取出来,然后后边我们还可以输出浏览量PV值。
14:26
到底是多少?浏览量为?啊,那这个值的话,就是我们这里的PV。然后还有当前的窗口信息。窗口为。这里我们需要有一个start.end点,我们当时没有单独的保存,但是我们知道只要start再加上size,这就是当前窗口的结束时间啊,那所以当前的信息就全部都输出了,这就是我们模拟的这个窗口计算的过程,那另外我们还要注意一点,因为窗口计算完成之后,当前窗口还应该销毁啊,哎,所以这一步这是窗口。
15:06
输出结果模拟了这个过程,那另外我们还得模拟一个窗口销毁的过程,那窗口销毁在这里边其实也非常简单,那就是从状态里边把对应窗口信息删掉就可以了,所以这里边我们就是调用map的remove方法,将当前的K直接删除掉就可以了。这就是我们完整的一个处理的过程,好,那接下来我们可以运行一下,看一看得到的结果到底是什么样子。我们当前还是十秒一个滚动窗口,所以需要先等待一下。看看十秒之后,到时间之后得到的处理结果,哎,我们看十秒钟到了这个时候,因为我们是以URL作为K嘛,所以只要这里产生的每一个URL啊,都会做一个统计输出,哎,那我们看到。
16:00
只看前四条数据的话,我们看到这个ID为三的商品,它的浏览量非常的大,它是四次浏览啊,那别的都有一一或者是两次浏览啊,然后我们看到后边窗口的信息,他们都是同一个窗口啊,十秒钟一个窗口,诶,那后边这就是下一个窗口了。同样,下一个窗口也有很多条数据,都是统计当前窗口内每一个URL的浏览量。哎,这就是我们利用map state和定时器模拟实现了窗口的功能。
我来说两句