00:00
我们已经使用process or function实现了top喷的需求,在这个实现过程中呢,我们会发现思路其实是非常简单粗暴的,诶,那就是不就是要统计每个时间窗口内所有URL活跃度的套喷吗?诶,那我干脆就直接开窗把所有数据都放进来,然后统计每个URL的count次数,接下来做一个排序,提取前几名就可以了啊,那整个这个过程就是所有数据先收集齐,然后再进行处理,直接用到了一个全窗口函数process or window方式。当然了,这个处理过程我们就会发现稍微的有点繁琐啊,就是本来这个数据都已经收集齐了,然后呢,我们再一个一个拿出来啊,统计一下到底有几个,接下来再做排序,再做提取,最后包装信息,打印输出。而且我们仔细一想就会发现,不光是代码实现的看起来很繁琐,实际我们整个程序在执行的过程当中呢,肯定效率也比较低,因为这就相当于是一个批处理嘛,我们说全窗口函数,它是让数据到来的时候先攒起来,攒够了之后等到窗口结束时间触发计算的时候才来调用这里的process方法,那很显然之前我们数据收集的过程都是一个等待的过程,这个效率就比较低了。
01:23
那怎么样能利用我们之前说的流处理的思路,来一个就处理一个,来一个就处理一个,提高处理效率呢?那当然我们就想到了,还是应该像之前URL view count这样啊,先做一个增量聚合,每来一个数据,我们就使用一个aggate function啊,增量句和函数,先把它的个数统计出来不就可以了吗?等到我们已经统计出了每个URL它的访问次数,再把每个时间窗口里边的数据都收集到一起,再去排个123名,然后输出我们最后的top就可以了。
02:01
所以呢,我们实现套其实还有另外一种思路,那就是在之前URL view count统计每一个URL在当前时间窗口内访问频次这样一个需求的基础上,再追加一步,同样可以实现这样的功能,而且这样有一个好处,就是说像前面我们做这个。全窗口函数收集所有数据去统计的过程呢,我们是做了一个WINDOW2啊,我们说WINDOW2,它本质上是把所有数据都分到了同一个分区,这相当于强制并行度变成一了,那所有数据没有办法并行计算,效率肯定也很低。而如果说像前面URL view胖子呢,我们自然就可以按照URL先做分组,先把它分到不同的分区里边去做并行计算,然后得到的结果我们再把它收集到一起,诶,这样的话很显然运行效率就会更高。所以接下来我们要实现套喷的思路,那就是可以把之前URL view count这里做的操作作为我们的第一步啊,就相当于我们第一步统计每个URL访问次数,这个就已经做完了,那接下来怎么办呢?哎,接下来其实我们就是要把得到的访问结果,哎,注意这里得到的访问结果我们已经包装成了URL view count。
03:21
那么这样一个样例类类型里边的数据是URL,还有count值,另外还有窗口的信息。那接下来呢,这些数据就要按照每个窗口里边的所有的UI count要统计在一起,按照count值进行排名,然后提取套盆啊,那所以接下来我们会知道,那应该是按照窗口再去做一个划分,分组之后呢,收集起所有的数据,还是同样的逻辑得到一个列表,我们把它做一个排序,然后提取就可以啊,最终可能我们还是包装信息做一个打印输出。
04:01
啊,所以接下来我们要做的这个事情,显然那就是在之前这一步的基础上,后边再做一个点KBY操作,那只不过现在K的就是window的信息了,我们可以k by Windows start或者window end,然后再接下来呢?啊,那当然后边我们就是在做一个收集当前窗口内所有数据,然后再做对应的排序处理的过程。这个处理的过程,我们到底应该怎么做呢?那我们可能会想到,诶,这个很简单啊,我们现在不是所有的数据都已经作为一个一个的URL view count。这样的一个数据结构啊,样例类的对象,然后一个一个输出了吗?诶,那我们就按照当前的这个窗口,把它所有的数据收起来不就行了吗?诶这里有一个问题就在于,比如说啊,我们这里边十秒钟一个窗口嘛,零到十秒钟的窗口。我们当然可以按照窗口信息进行一个分组,那比方说现在我们就来了一个零到十秒窗口内的数据,比如说这个URL对应的页面就是点杠后count,它被访问了五次,后面跟着的当然就是零和十,我这里直接用秒数来代表这个时间戳,那这样的话,这个数据来了之后,哎,那我们就直接把它排成零到十秒当中的第一名吗?
05:25
注意后边还有可能继续会来零到十秒钟这个窗口里边的统计数据啊,诶,那所以我们自然就还得等,那我们第二个如果来了之后来了一个点cut。它是三次,同样还是零到十秒这个窗口之内的数据,那接下来他就直接排第二名吗?我们现在可以把这个前两名都输出了吗?当然不能,因为接下来后面还有可能跟着零到十秒钟这个窗口内对应的数据啊,诶之前我们这个统计聚合,这是所有的URL都要统计出来,并没有个数的限制。
06:00
所以我们当前并没有一个预期说,诶,到底是什么时候就可以把当前所有的数据都收集齐。啊,那当然了,我们可能也就能想到啊,哎,那之前我们不是说了这个就是按照窗口时间去设置的吗?哎,那现在既然已经是零到十秒钟窗口的数据输出了,那现在时间就应该已经进展到十秒了。那既然是到十秒,这些数据不就都应该到齐了吗?诶,这个不好说,因为我们现在收集到的这个数据里边。接下来做的操作已经没有窗口的信息了,我们相当于已经不再开窗统计了,我们拿到的就是一个一个这样的URL的数据。那这些数据呢?我们只能通过它的Windows start window and去判断到底是属于哪个窗口。那我们来一个又来一个,诶,那其实根本没法判断它什么时候就可以全部都到齐,这就有点尴尬了,那我们到底等到什么时候为止呢?
07:01
哎,这里有一个小技巧,那就是我们可以设一个定时器。哎,我们之前说过有这样的timer定时器这样的操作,我们只要按照事件时间设置一个十秒钟之后的定时器,等到到了那个时间,那自然之前所有数据就都已经到齐了。诶,所以通过这样一个小的技巧,就实现了收集之前我们一个窗口输出的所有数据这样的一个目的,哎,那所以我们当年设置定时器,设置到什么时候呢?那其实就是十秒后边稍微过一点点就可以,比如说我们可以十秒后边加一毫秒。设置一个等待一毫秒的定时器,那这样的话,假如当前的事件时间已经进展到了十秒后边一毫秒,那么这个时候,当然之前我们零到十秒的所有的结果就都已经到齐了啊,所以通过这种方式我们就可以解决收集数据的问题。
08:04
所以接下来呢,我们可以利用这样一个示意图来想一想接下来我们的代码里面要做一个什么样的操作啊,那既然我们用到了定时器,那自然想到了当前只有kid process function里边能用定时器嘛,所以我们现在要用的当然就是一个key process function了。这里还有另外一个问题,就是我们使用kid process function里边要注册定时器,要等待当前所有的数据都收集齐了,然后再去排序输出前几名。那么在这个等待的过程当中,之前已经到来的所有数据我们应该放在哪里呢?这肯定不能丢掉嘛,我们现在的流处理是来一个就要处理一个,哎,那假如说我们是一直等到最后才处理的话,那之前数据肯定是要先保存起来的。哎,那所以我们自然想到了,最好是把它保存成一个列表,那这样一个列表的话。保存好了之后,再触发定时器的时候,直接把它做一个排序,提取前几名就可以了,这个列表呢,如果我们直接把它定义成本地变量,它会有一个问题,就是因为我们知道当前的flink任务其实都是可以并行执行的嘛,我们有多个并行的子任务。
09:17
所以我们在代码当中定义的一个算子,它实际的执行可能是在多个分区上去并行执行的,那他可能在多个分区上都要创建自己的一个对象。那么这里边如果我们定义一个本地变量的话,它其实是针对当前的分区有效的,也就是一个分区上有对应的这样的一个变量的实例。所以如果说我们这里边定义一个本地的list来保存我们当前的数据的话,那应该就是一个分区,一个list。而我们知道啊,现在做了K败,并不代表一个分区就是一个K,它跟K的对应关系又不是唯一的,那这样的话就有可能造成我们后边处理逻辑的混乱,而且你直接使用一个本地变量的话,Flink对它是没有容错机制的保证的,也就是说如果说呃,这个时候啊,出现故障,我们当前分区任务给挂掉了,恢复重启之后,那之前所有的数据其实是丢掉了啊,那这样的话当然是不合适。
10:21
那怎么办呢?为了解决这个问题,Link给我们提供了。状态编程的机制啊,也就是说我们可以把它用flink内置的状态类型来进行保存,那如果说把它保存成了flink的状态,那接下来flink就会帮我们把它管理起来,而且就会按照当前的key来进行访问和操作,哎,那另外呢,就是如果出现故障的话,都可以自动的保存,自动的恢复,哎,那所以现在我们如果是想保存成一个列表的话,那就使用一个自定义的列表状态叫做list state来进行存储啊,那当然了,这一部分涉及到了状态编程的东西啊,关于状态编程我们会在后边的第九章详细进行讲解,那现在呢,就是先来大概的有所认识就可以了。
11:11
诶,那一般怎么用这个状态呢?其实就是这样的一个流程,我们需要去定义这样一个list state,声明它的时候呢,放在open生命周期里面,我们现在不是一个k process function吗?它本身继承自复函数类,有open生命周期,在这里边我们就去创建一个list s,用来存储数据。然后k process function里边每来一条数据,就会调用process element方法,诶,那么这个方法关键要做的事情就是直接注册一个定时器,把它的时间设置为window and加一,就是我说的啊,零到十秒的窗口,我们就把它设成十秒加一毫秒的那个时间戳。到了那个时间,我们的所有数据就收集齐了啊,所以我们在process element里面还要做一件事,就是把当前到来的这个数据添加到列表状态里面。
12:06
那注册了定时器之后,接下来等到这个定时器触发的时候,当然就从列表状态里拿出所有的数据,直接排序得到top n就可以了。这就是我们整个的处理流程。
我来说两句