00:00
接下来我们再来介绍一下map set,也就是映射状态在具体项当中的使用。映射状态的用法呢,跟value set和list state非常相近,它的定义同样也是我们先在一个类里边去做一个声明,然后在open生命周期里边,利用get contacts,在运行上下文里边调用它的get map state方法传入一个script描述器,然后获取到当前。状态的控制居啊,那么它的具体用法呢?拿到了之后,那其实就跟Java里边的哈希ma非常的相似了,那在实际的项目当中,这种数据结构,这种状态非常非常的常见,也非常的灵活,因为我们知道它里边的存储。方式就是k value,一个K一个value,那呃直接去获取里边值的时候呢,也非常的方便啊,根据key可以非常快速的获取到它对应的值,那么在这里我们可以解。
01:02
介绍一个非常简单但是非常有趣的应用,就是可以使用map set来模拟一下flink里边的窗口啊,那当然了,最简单的窗口就是滚动窗口,所以我们可以使用映射状态map来模拟一个完整的滚动窗口的功能,当然这就包括窗口分配器啊,来了数据之后,我们怎么样把它分配到对应的窗口里边,然后诶,到时间的时候我们怎么样再去把窗口去做一个计算,做一个输出,那另外还需要去模拟窗口的销毁关闭,那整个这个过程我们应该怎么样去实现呢?看起来这个功能有点类似于我们之前讲过的周期性计算PV的这个功功能,那一定是要用到一个定时器的,这是首先我们能够想到的。如果没有定时器的话,我们就没有办法对于时间进行一个精准的控制,所以肯定要定义一个定时器,到点之后我们就做这个窗口计算,那窗口里边的所有数据我们该怎么存储呢?
02:08
一个简单的想法是,诶,我可以用一个列表去把所有的数据存起来呀,当前窗口里边只要到来的所有数据,我就把它存在一起嘛,然后等到这个当前定时器设置的定时器到点了,我就把当前列表里边所有数据拿出来,计算一个最终的结果,然后直接输出,然后清空状态,然后再设置下一个定时器不就完了吗?这看起来好像跟我们前面的这个实现过程差不多,只是把这个周期性的计算PV的这个过程,把数据不要只保存一个count,而是保存到一个list state里边,看起来就可以搞定。但是我们仔细一想的话,没那么简单。因为如果我们想要模拟一个滚动窗口,而且是事件时间的滚动窗口的话,那是有可能会有所谓的迟到数据的。
03:00
也就是说,当前某一个窗口有可能是十秒要关,那我们应该看water mark到达十秒的时候,我们才应该关闭当前的窗口。但是整个这个数据有延迟,有乱序啊,有可能时间戳11秒的数据现在就已经来了,那这个时候这个数据我们也把它放在当前的这个列表状态里边,直接把它统计在当前窗口里吗?显然不应该这样做。所以我们就想到了针对窗口,每一个窗口都应该有这样的一个list来保存当前的所有数据啊,那。那不同的窗口里边的数据呢,还应该按照窗口的。起始或者结束时间戳去做一个分别的保存,哎,这里边的话就涉及到了不同的定义了啊,那我们就会发现啊,按照这样的一个描述,我们应该把所有的数据保存到一个类似于哈希map的数据结构里面。
04:07
当前的K就应该是。每一个窗口的ID啊,我们可以用窗口的起始时间或者是结束时间来做一个表述,那对应的值呢,Value呢,就应该保存当前窗口内的所有数据。当然了,我们可以进一步把它做一个简化,如果说我们想要统计的还是一个类似P这样的功能的话,或者我们统计之前URL,我们统计的是每个窗口所有。所有URL被点击的次数啊,那是不是以user去做键去做划分了,而是以URL作为K去做一个分组划分啊,这个都没关系,我们只要知道这样划分之后呢,是要统计它的一个count的值,哎,那自然就想到了,那这个value里边我也不用去保存所有数据了,只要保存一个count值就够了。
05:03
所以相当于我们现在应该要保存的是。这样的一张哈MA1张表,一个window对应一个值,一个window对应一个值,当来了一个新的数据的时候,就把它对应的那个window里边的count值啊,当然了,我们对应还得找到不同的URL,按照URL划分。这个我们知道在k process function里边。只要我们先做了K,后边的状态都不用我们考虑,默认就是按照K划分开的,那所以接下来我们其实。已经按照URL或者说按照user这个K都已经划分开了,另外我们还要按照不同的窗口,不同的window再做一个划分,然后把它们每一个窗口的count值统计下,所以我们现在用的就不是一个简单的value state或者是state了,我们用的是一个。Mapst,这是一个映射关系,所以接下来我们可以在代码里边去做一个具体的实现,还是新建一个Java类,当前我们这个就可以叫做。
06:11
Fake window example。因为我们要做的其实是一个一个假的窗口啊,对于窗口的一个模拟。那同样还是上边先把throws exception写出来。前面的这个整体过程都是一样的啊,所以我们这里可以快速的写一下stream execution,先把创建出来,这叫做env,不是一般性,我们把它全局的并行度是什么一最后应该有一个execute。这是我们整个的框架结构,先放出来,然后中间的处理过程,我们就类比之前的URL这样一个例子吧,啊,我们其实就是要把所有的每个,比方说我们定义这个滚动窗口十秒,就是十秒之内的所有数据按照URL做一个划分,统计出每一个URL,每一个页面被点击的次数,啊,那这个过程我们先把前面的。
07:11
内容都copy下,前面这个数据当然是一样的嘛。Click,然后我们把这个按照乱流里边生成的机制给它定义出来,时间戳提取出来,这里边我们还打印一下当前输入的数据,接下来呢?呃,接下来我们要做的事情其实就是直接K。这个过程还是一样的。URL。然后接下来用一个process去实现,很明显现在我们要用的是一个process方式,因为里边我们用到了map,用到状态,用到了定时器,只有process方式才能实现。那这个我们就叫做。自己把它叫做定,定义成fake windowor。
08:00
这个我们也是直接包装成一个string类型,然后打印输出就可以了。这就是我们想要的代码框架。然后接下来。那就是实现自定义的。Key的process方式。同样,我们还是public static class。Fake window result。Extend。的process方式。KIO当前的泛型啊,那当前的K的话,URL是string类型,然后输入event类型输出我们现在要的也是。这样的话,我们就可以定义想要的状态,然后定义具体的操作流程了,那那首先我们应该想到,在这个定义的过程当中,如果要完全模拟之前的这个窗口的话,我们会发现滚动窗口应该还得传入一个参数,就当前窗口大小是多少,应该是可以设置的啊,那所以。
09:03
当前在这里边,我们这个类本身应该也可以传入一个参数,比方说我们就传入一个十秒钟的话,我默认这里面要求的都是毫秒数,所以传入一个1万。1万毫秒,那就是十秒钟了。把这个当前的类里边的一个属性叫做Windows,定义一个自由属性。这是窗口的大小。那当然了,呃,我们既然是专门定义了这样一个属性,自然是应该有对应的构造函数。我们把对应的构造方法先声明出来,传入一个Windows啊。那么接下来我们就可以直接。当成当前的这个属性,然后直接去用了,然后接下来呢,呃,就还需要去定义我们想要的一个map state,这是用来保存当前每个窗口里边统计出来的个数的一个状态。
10:06
所以定义一个last。用来保存每个窗口中。统计的。抗值。定义的过程跟其他的其他类型的状态是一样的啊,那不同的是mapt state有两个泛型,那一个泛型K的话,我们这里应该是窗口,窗口的ID,那窗口ID我们就用窗口的起始点就可以了。那另外还应该有一个就是抗值了,抗值应该也是长征性,所以就是涝涝。然后这里我们定义成window。URL。Count map state。尽管比较长,但是我们一眼就能看出来它的类型是,而且它表示的是每一个窗口的URL count。
11:04
然后接下来我们就需要在open生命周期里去对于这一个map定做一个获取,获取到它的控制距。这过程完全一样,还是基于运行的上下文runtime contact,调用它的get map state方法,然后去入一个map的描述器,啊,那里边这里边我们要传入的就多了一个参数,除了它的名称之外,呃,我们这里边的名称可以直接给一个。Window hunt。那后边我们还应该传入它的key和value对应的类型,所以可以直接点plus。浪点。这样的话就定义好了。然后接下来我们就需要去实现最关键的核心逻辑,每来一条数据,到底应该怎么做呢?诶,那首先。
12:03
每来一条数据。应该判断。根据时间戳数据的时间戳判断属于哪个窗口,那这其实就是。窗口分配器要做的事情啊,那接下来我们就是先。来获取到当前的。对应的Windows start。当前一个数据,它的时间戳非常简单,能够拿到就是value。那么它到底属于哪个窗口呢?哎,其实非常的简单,因为我们知道如果是滚动窗口的话,十秒钟一个窗口,那我们定义的窗口就一定是整十秒。其实。所以很明显就是除以当前的Windows,然后再乘以整数,除法之后再乘以当前的大小。这当然就是。
13:02
就是当前的这个整十秒钟的这样一个起始点。啊,那。在前面我们讲到窗口那一部分的时候,也给大家看过底层源码里边对应的这个实现,我们的逻辑其实都是完全一样的。哎,那同样我们可以计算出当前这个窗口的结束点时间点。Window,那就应该是window start,再加上window,这个非常的简单。接下来我们就要去定义窗口的相关属性,窗口里边其实除了start和end之外。最关键的其实就应该有一个触发定时器,它什么时候触发呢?按照之前我们窗口那一张里边flink底层的实现,它其实定义的定时器是window能够包含的最大时间戳,以它作为当前窗口定时器的时间。
14:03
其实是窗口里边能够包含的最大的时间戳,那那个最大时间戳其实就是window and减一。所以接下来我们是要。注册。And减一的定时器。就是我们所定义的定时器,当然需要用到cx.time service,然后register。我们可以注册一个事件时间定时器,这里模拟的是事件时间的滚动窗口。当然。给的是一。注册完成之后,那接下来就是要。更新状态了。更新状态。进行。增量聚合,所以我们这里做的其实是一个增量聚合的过程,每来一个数据,直接就把聚合过程已经完成了,最后要输出的时候是等到定时器触发,然后我们直接把所有的计算出来的结果啊,状态里面的结果输出就可以了。
15:06
那所以这里面又涉及到一个,诶,那我要去做这个增量聚合的时候,到底之前有没有我的那个map里边,Mapate里边有没有当前对应的这一个URL,它的对应窗口啊,我们当前UI,当前map里边的K是窗口,那到底有没有这个窗口里边的数据呢?没有的话我们获取不出来,所以还需要去做一个基本的判断,那就是。如果当前的maps里边contains。Windows start。当前的。这个起始点的话,那么我们定义一下当前的PV到底是多少。啊,或者说我们叫这个,我们定义一下当前的count是多少。那就是直接去get。Windows start。
16:00
把这个值先拿出来,然后接下来当然就是直接加一了。做一个put操作,就像哈希map一样,把当前对应上的值用。Count加一去做一个替换。啊,那这个是如果已经有的情况,那如果没有的情况怎么办呢?没有就更加简单了,直接put put1不就完了吗。所以这个逻辑其实跟我们前面的处理过程还是完全一样的啊,完全类似的一个过程。这里面还需要注意一个。我们现在是把process element已经搞定了,这里面注册了定时器,那真正窗口的触发和输出结果计算的话,这里面我们是增量,增量聚合,已经每来一个数据就都已经计算完了,但是最后的输出结果还是要等到定时器触发的时候才会输出,所以还需要考虑。
17:01
定时器触发时。输出计算结果。这是我们窗口真正要考虑的问题啊,那接下来这个输出结果那也非常的简单了,关键就是如果按照我们之前想要得到的内容的话,那就是当前的URL到底是什么,然后呢,当前的访问量,URL的访问的那个count值到底是什么,另外当前的窗口又是又是哪一个,哎,那所以这三个值其实我们都可以拿到,首先我可以知道当前窗口的结束时间。Window。这个window,那我们知道当前触发的时间戳timemp就是window减一,那当然当前再加一就是window了。那对应的Windows当然也就可以得到了啊,所以我们也可以把这两个都出来。Windows。
18:00
那就是再加上。比较大,那应该是减去Windows。这是窗口相关的信息,然后URL的话非常简单,那就是当前的cx get current就可以了。而另外还有一个就是。我们想要拿的count值。Count值的话是在当前的状态。Window URL map去get当前的window啊,这个window还是必须要计算出来的,因为至少我们获取这个值当时是以它作为K去保存的嘛,所以必须得计算出来。好,那接下来我们就可以直接输出了。要的是一个4G,那我们直接做一个包装吧。窗口窗窗口是哪个窗口呢?我们这里可以直接一个。Time。把Windows先传进去,然后诶,我们可以让它加一个这个号。
19:08
另外,把。Window and。也写进去。这样的话,我们就知道窗口到底是哪一个,然后对应的还应该有。它的URL是哪一个。URL,呃,那应该是CX加上get k,我们可以在这里换一行。然后接下来还可以再去加上当前的count值。那count值的话,就是上面我们得到的count,就是我们想要的完整的输出数据。这里面还需要注意一个,当前的窗口信息都已经输出了,那我们还应该把窗口要关闭啊,窗口应该销毁啊,那现在的窗口信息到底应该还涉及到什么呢?我们知道窗口里边其实就是start and,然后注册了一个定时器,然后啊,还有这个状态里面的保存,所以现在定时器如果已经触发,那定时器就相当于已经销毁了。
20:11
关键就在于还要把状态清空就可以了,但状态我们不能直接clear,因为这是一整张map state里边是所有窗口对应的统计值都在那边,那所以我们现在只是把对应的这一个窗口信息清空就可以。接下来诶,那就是。模拟。窗口。关闭。清除map中对应的。Cheap value。所以这里边我们要做的事情也就只是调用的remove方法,然后把对应的Windows这个删掉就可。所以这就是我们整个模拟窗口的实现过程。那接下来我们可以运行一下,看一看它的输出结果是什么样的。
21:05
我们看到现在是每隔十秒钟就会输出一次,但是并不是第一个数据来了之后十秒才会输出,因为我们现在是整十秒,只要等到整十秒的时候就会输出,这跟flink底层的滚动窗口的逻辑是完全一致的啊,那这里我们可以看到第一个窗口这里输出的,诶,这是30秒到40秒的这个窗口啊,那这里就只有car这样的一个URL count是一,那后面我们看到后面这个为什么没有统计呢?因为它只是触发了我们40秒的这个定时器。它本身并不属于30~40秒这个窗口,所以这个逻辑跟我们之前看到的是完全一样,下一个窗口当然就是40~50秒了,Urlp不同啊,那对应的状态都是不同的,定时器也是不同的,但是它都是同时出发,所以这里边我们可以看到这个时候会输出很多个URL的统计值啊,这里我们可以看到啊,不同的UR有不同的统计。
22:07
就是我们对于滚动窗口的一个使用mapstate的模拟。
我来说两句