00:01
那我们接下来看一下演示啊,那比如说呢,咱们一个。原先的案例统计最近一个小时的UV1秒更新一次啊,用滑动窗口来实现,那这个我也写好了,给大家看一下滑动窗口案例。那前面这一块都一样处理数据对吧,这一块咱们不关注,重点关注哪呢?从这开始。那首先。既然前面是去过重的对吧,已经去完重的数据,那我直接把每条数据转换成最简单的格式一啊,然后我对这个一累加就行了,对吧,那好根据咱们的需求。咱们是不是这么难写?也不用分组了,没必要。那是不是用一个sliding,然后呢。窗口长度一个小时。滑动步长一分一秒,呃,然后呢,就做一个聚合。
01:05
聚合完之后这边呃,我用了两个函数,第二个是全窗口函数,我将窗口的信息拼接上去了,呃,做一个展示而已啊,这个类看一下吧。把统计值拿出来,取出开始跟结束时间,然后封装到一个三元组没了。仅此而已。好,这个案例我们先提交。这没用啊,不看了。刷新好,趁着他提交。啊,不行,还等吧,先看一下。细粒度窗口带来的一个问题来打开。看到没有,在这里,这个是咱们的滑动窗口啊。一会看看会不会出现一个性能问题。
02:06
那么其实先等一会儿刚启动。都红了对吧。很繁忙。好,现在状态已经更新了,大家看一下,基本上呢,通过这个颜色大家也一眼就看到最红的是哪个滑动窗口。那么这边如果你看一下这里。应该是可以看到比较繁忙。这是不是就是大平顶对不对,基本上算是大平顶了,一个尖尖的只有一个啊。目前是处于这种状态。红色的啊好,我取消。接下来我们看一下我怎么解决的,怎么做一个时间分片,怎么用滚动窗口。
03:05
呃,并且通过自己维护实现滑动窗口,那就看上面这一串代码。计算代码呢,我开的是一个什么呢?滚动的窗口。那么这个滚动窗口布长,呃,它的窗口长度给多大呢?我们说了以滑动步长为准,滑动步长既然是一秒,那我就把它切割成一个一个的一秒钟的滚动窗口,每一秒聚合一次,那你这个长度多大呢?我就对应的去把它取出来就行了呗,对吧?好,那你看这边reduce是不是正常的做一个聚合。对,这一秒的滚动窗口聚合,聚合完之后我也拼接上了窗口开始跟结束的时间,方便咱们处理对吧?好,现在有一个三元组了,再接下来。为什么做key by呢?呃,方便直接调用这个process对吧?啊,我用的是key的process function,这没有什么意义啊,没有什么特殊意义啊,反正前面这些并行度本来就是一。
04:05
这种窗口的并行度只能是一。后面这个并行度我也强行设成一,为啥呢?你想想咱们是不是需要对一个一个的滚动窗口的结果。比如说我需要把三个汇总,那我就汇总这三个就好,而且是不是要按顺序对吧,那这个时候你就不要考虑多并行度了,首先它数据量小,一秒钟输出一条结果对吧,完全扛得住,第二个也是要实现本身要求,并行都必须是一。那前面这个本身就是一对吧,WINDOW2的话并行度是一,大家知道后面这个我又指定为一啊。好,接下来核心在哪,在这里。我这边没有去写跟外部系统的交互那个相信你这个会写了,那个你也会写。来看一下这里我怎么做的。进来。这是我写好的,这个先放边边嘛,写好的一个类啊。
05:01
首先在初始,呃,Open。生命周期管理方法的,这里啊,我是初始化了一个叫。状态。那我用的类型呢,是一个value state,那state啊,看这上面吧。这个是date。它是value的,但是里面的每一个元素是一个list。我只是为了方便操作,用这个你也可以用list state啊,就存一个list集合好,那么接下来呢。有一个spli list。是用来接收的啊。回头取出状态吧,然后给他。还有一个切分数,你看我这里初始化了一个切分数,切分数呢,其实就是时间分配,我要切几份嘛,按照咱们的需求,一小时的窗口,一秒的滑动,那我这原先一个一小时的窗口得切分成几个滚动窗口啊,你一除就得到了。
06:07
啊,60分钟除以一秒,这就是咱们的分片数,那是不是应该等于60乘60对吧。另外聚合结果呢?呃,我放在一个tale里面好了,是一些初始化的工作,接下来看一下核心逻辑,其实就这么几行。首先来的这条数据是什么?是不是一秒窗口的结果对吧,一秒钟滚动窗口的结果。好,接下来你看我们先清空一下统计值,大家注意我这边呢。用的是这个什么,所谓的window result这个是什么东西,是一个TOP3,只是我一个中间临时变量过,呃,用来存数据用的啊,没什么意义。呃,先。初始化零,好,接下来。
07:02
是不是取出状态里面的值啊。里面是不是存了一个list value啊,Value取出来是个list list里面是不是每个元素都是一秒钟的聚合结果,对吧?每个元素都是一秒钟的聚合结果,那么接下来我把当前来的这条结果也放进去。也放进这个list里面。好,放进来之后我干嘛呢,判断大小。对吧,为什么?你想想,呃,前面几个,比如说当前只来了三条数据,我需要对三条聚合吗?你是不是得这个分片数达到的圆切滑动窗口的长度,你再来聚合,所以要做一个判断,当我这个list子的结果大于切分数的时候,我才开始处理,否则就继续攒呗,对吧,继续攒好,那首先我干了一个什么事?我一个分片。一个一个滚动窗口是一个时间分片,其实也就等于一个什么。
08:03
补偿。那现在来了一条数。添加进来的这条已经大于切分数,是不是说明这条不算的时候是刚好等于那个原先滑动窗口的长度啊是吧?那么这个时候新来的我是不是可以把原先最老的那一条给删掉了,因为它对当前来讲已经过期了,对吧?那这样吧,咱们画个图吧。画这。好,这是一个一个的时间分片。好,再来一个大的。咱们以一个动态的角度来讲,给大家梳理一下这个原理啊。
09:08
这是原先窗口滑动窗口的一个长度大小,比如说它跟布长一切的话,咱们切成四份啊,举例啊,切成四份。呃,就比如说这个是十分钟的滑动窗口,步长是两分半,对吧,那是不是切成四分了。那这个时候怎么办呢?我是不是一个时间分片会有一个聚合结果,一个聚合结果你看输出给这个process的时候,是不是第一条,我判断一下当前的个数是一。有没有大于四啊,没有。好。第二一个。进来了。第二个进来之后是不是也判断。没有大于咱们的分片数对吧,继续存好第三个进来。好,第四个进来。
10:01
对吧。那这当我达到四个之后,我是不是应该把这四个聚合一下。四个聚合起来,是不是得到了第一个滑动窗口的结果?是吧,那好,假设这时候再来一个结果要进来了。最新的咱们放这边吧,好吧。那这个老的是不是其实已经属于第二个滑动窗口了。对吧。那我们划一下。新的添加在末尾,原先的最开始的所有为零的删掉,我是不是聚合这四个就行了。对吧,这就是我的代码逻辑,自己实现滑动的效果,对吧,对分片进行一个处理,当然这边咱们。继续看啊。Remove。这边其实严格来讲还要再加个判断啊,先不改了,就这样吧,然后接下来头部元素去掉了,接下来是不是该变异。
11:04
对时间分配进行聚合,对吧,那我就是对这个list做一个便利,哎,然后呢。F2是不是第三个位置,这是我放的是一个统计值count值。我是把它累加下来。对吧。对这个统计值进行累加,那。这个滑动窗口的开始时间怎么算呢?第一个元素的开始时间是不是就是窗口的开始时间对吧?第一个元素,如果I等于等于I等于零,是不是头部元素,头部元素的F0 F0就是咱们的window start。那。等于零的时候,我把这F0拿到赋值给我,这个变量的F0也是从窗口开始时间,好,那如果我的I也就遍历到最后一个元素的时候。我是不是将它的窗口结束时间复制给我这个中间变量啊,什么意思呢,你看。
12:04
等整个窗口的范围,怎么确定是不是第一个元素的开始时间呢?等于它,那这个滑动窗口原先的结束时间是不是等于最后一个元素的结束时间呢?对吧,说白了这个逻辑就是取这两个值好拼接上之后,现在统计值也有了,咱们需求里的,呃,滑动窗口的长度开始跟结束也有了,接下来怎么样发送结果就OK了,这个就是咱们的一个实现逻辑。另外唯一要改的就是这里,当我第一次满足的时候,应该是用大于等于啊,应该不能用一个大于。然后这里再判断。
13:04
判断,呃。第一次刚好凑足再一次判断。如果它的大小等于number。加一。哎,对。啊,应该这么来算。我再删逻辑不变啊,逻辑不变是我之前疏忽了一点啊。好,那这样就没问题了。来,我们。打包执行一下。
14:06
好,接下来我们提交咱们自己实现的这种时间分片,然后自己进行聚合的这么一个案例,我是类似用一个队列的思想来做的的滑动啊。我是自己维护的一个状态啊。好了,Running呢?我们再来看看这个窗口目前有没有瓶颈。啊,就这个位置。大家可以看到现在是一个什么滚动窗口了。跟之前的对比,我是用时间分片,用布长做一个分片。好,现在是刚启动,所有的都是busy。等一会儿。
15:00
好,你看现在还有像之前这样吗?这么红繁忙程度百分百,因为它重叠的窗口太多了,那么现在改成了滚动窗口它就好了,你看它的繁忙程度才1%,对吧。那你看一下火焰图。这个已经就是纯粹的就是层层调用了,就一直在跑,一直在执行,一直在执行,这个是什么UN safe park对吧,这个其实应该是阻塞态啊,你看。C是一。那么再看看它后面的聚合会不会有问题呢?
16:02
你可以看到他也不是很繁忙嘛,因为其实他一秒接收一条数据而已,对他来讲也没什么压力。虽然并行度也是一对吧,没压力的。这个就是咱们做的一个优化,很明显很直观就看到性能做了一个优化。这个就是咱们用data式stream来实现。呃,细力度滑动窗口的一个问题,那么你想想有没有更方便的方式呢?有啊,有,但是你不能用data string API了。你得用什么?Flink circle,并且呢,是flink1.13才可以在我们flink1.13发布的时候啊,咱们的雪大佬对吧?啊就。徐蚌江吧啊,就花名是雪静啊,大脑弗林格知名大脑啊,那么他。这一块介绍了一个模块叫window t VF啊窗口的表值函数,这是新的一个功能,那么它进行了一系列的优化,可以自动呢解决咱们滑动窗口细腻度的一个问题,细腻度滑动的问题,你看这是当时他介绍的一个PPT,他对窗口做哪些优化呢?第一个内存优化就增加了一些buffer之类的。
17:23
这些不是关注的重点,关注这个它将window切片尽可能什么呢?复用与计算的结果,其实就是跟咱们把它改成小滚动窗口之后呢,再自己聚合一样的道理。后边呢,就是跳跃窗口,其实就滑动窗口,后面这个就累计窗口啊,这些它都可以复用已计算的结果。也就是说你用circle的话去写就可以,它的语法可能跟之前大家聊到的window function呃,不太一样啊,或者说circle语法不一样,大家看一下,我要提醒一下,并不是说你按照以前的写法,这个flink circle就能把我们搞定啊。
18:04
呃。嗯。好,你看这里多了一个模块,叫TVF。不一样啊,你看这里,呃,这是circle模块,这是TF,那后面的是什么窗口聚合啊,什么什么什么,你看一下滑动窗口,它咋写的,看语法啊。他变成这么写了。From table,然后呢,咱们的这个所谓的滑动窗口的语法写在这个table里面。而且他这边指定啊数据还有这个。几个参数。时间列,这是时间基准列,接下来是滑动。滑动的一个布长,这个是窗口的长度,那如果你是基于这种语语法TF语法写的话。
19:03
表值函数,那么你就不用担心咱们之前的滑动窗口细腻度的一个问题,对吧,这个是就很方便了啊,所以其实说白了,咱弗Li一直在更新,一直在更新,大部分的优化还是集中在circleq里面,它目标也是circleq更易用嘛,对吧,其实门槛比较低一点。啊,这是提醒大家,这是一三版本才能才可以实现时间分片啊,而且咱不用管。
我来说两句