00:00
可以看到前面我们用的这种方法是直接基于当前所有的数据去开窗,然后把所有的数据放在一个窗口里边,用一个哈希map来对它进行分组统计啊,所以这就相当于本来我们应该是在外边K就可以把这个数据分组分开了,而我们呢,诶没有把它分开,我们是手动用了一个哈希map去分别保存,其实大家知道底层的原理肯定都一样,所以这里边其实我们就是把它做了一个手动实现。用了一个哈希map做中间状态,保存所有数据的统计值,等到到了这个结束时间的时候,我们把它拿出来转换成一个a release,然后去排序,当然了这一步操作我们是直接在get result这一步做了也可以,我们get result就把这个哈map返回,那返回来之后呢,在全窗口函数这里再去把它做这个转换也是一样的,因为这两步操作它就是前后脚嘛啊,我们就是说这里调了这个get result,拿到结果之后,就把它作为输入传递给后边的全窗口函数作为后边的输入,所以你在那边做也可以,在这里前面做这一步也可以啊,这个就大家去选择这个时机啊,其实没什么区别啊,那这里我们前面的增量聚合呢,主要就是。
01:18
往哈希map里边叠加写数计算的那个过程省略掉了逻辑好理解是实现起来之后,它的这个效率其实你实际在项目当中运行会很低,为什么呢?因为你这样写的话,所有数据在同一个窗口里面window or大家想这个数据量大的时候,WINDOW2在同一个窗口里边处处理不能分区,哎不能分区的话,那它的并行度就是一。那我们这个并行架构,这还有什么用呢?我们本来最重要的就是要开窗去处理你所有的处理操作啊,最重要的处理操作前面还可以并行,后面全是并行度是一啊,那我们这个就整体处理的效率就太低了,那怎么解决这个问题呢?
02:01
哎,自然我们就想到了解决这个问题的方法,那就是你还是把它拆开,不要用哈希map了,哈希map你这是自己把所有数据收集起来,自己再去分分K嘛,那我们直接就有KBY,然后还能分区,还能并行,何乐而不为呢?诶,所以我们就想到了最好的方式还是按照URL,我我们要统计的是URL的话,那就按照URL先做一个分区,先做一个KBY。然后呢,先开窗,按照每个URL的这个数据先把它统计出来,所以这个过程就跟我们之前讲到的URL view这个example写的实现的是完全一样啊,所以我们先把这个放在这儿。那接下来的问题就来了,第二步,如果说我们想要对于窗口统计出来的再去进行收集和排序,又该怎么做呢?我们在处理的过程当中,一定要树立一个观点,就是我们整个处理的都是数据流。
03:01
数据流在任务之间流动,数据在任务间流动,铁打的营盘,流水的兵,所以你看之前我们这个数据测试的时候,其实就是诶,来了一个数据,来了一个数据,这是流逝的数据,对吧,不停的来,不停的来,来到这儿,这是一个窗口计算。窗口计算的话,哎,那它是会在这儿先截住拦住就是,哎,有一些数据来了这儿之后,我还不到窗口触发时间,那我在里边可以增量聚合,也可以不做聚合,就直接保存着,等到窗口触发时间到的时候,我再把里边的那个计算结果输出一个。而对于K败之后再做窗口操作的话,那不同K的结果会怎么样呢?那其实大家也发现了,不同K的结果是不是会在窗口结束的那一瞬间同时输出啊,啊,我们感觉是同时输出,其实对于弗林克而言,你还是留啊,我们现在并行度是一,即使是那个并行度不是一,是有多个并行子任务,你也得有一个分发的过程啊,你也是轮旋分发,或者是其他的一些转换,所以它还是依次处理的,流的特点就是这样,它是依次处理的,所以对于我们当前的输出结果而言,它是。
04:13
到了一个窗口,出发时间。按照K输出好几个统计结果,然后呢,过一段时间之后,诶,可能又到了另外一个窗口的出发时间,连续的输出好几个统计结果,所以是这样的。那从下游的任务看来,就假如说我们现在要基于这个URL counter stream啊,基于这个东西再去做一个处理的话,那我们在这里看到的数据就变成什么了呢?就都是一个一个的URL view count,当前其实就是一个一个数据轮流来,这个数据都叫做URL view count,然后它这个数据呢,它的来的特点是有点儿像这个一会儿来一堆,前面一个窗口结束了,那这里边我们就会收集到一堆当前窗口的统计信息。然后呢,隔一段时间可能没数据来,然后再到一个窗口结束的时候又来一堆,所以这里你要处理的也是一个流,只不过这个流里边的数据有点特点而已。
05:10
那我们现在的问题就是你怎么把这个流里边的数据。按照不同的窗口进行收集和排序呢?那我就按照之前的节奏,不是滑动窗口五秒钟出一次结果吗?那我后边就开这个五秒钟窗口,每五秒收集一次数据,然后统计输出就完了啊,那自然我们就想到了这个就涉及到一个你总是五秒钟收集一次的话,那是不是有可能啊,这个收集齐了数据之后,你得等五秒才能输出结果啊,啊,这个不是我们想要的东西,我们其实知道它的间隔大概是五秒,但不是绝对意义上的五秒,我只能是知道就是当前输出的这个数据啊,它是确实是到了那个结束时间,两个结束时间的那个触发点是五秒。但是当前的这个,真正你拿到当前数据的时候的那个water mark到底是不是叉五,这个就不好说了,所以我们的做法是。
06:08
我不需要开五秒的窗口,一直等他。我只需要等一会儿就可以了,哎,所以这是我们真正关键的这个思路啊,就是你的数据不是来了吗?我等一会儿把这个当前的一个窗口输出的这个数据收集齐就完事了。之前我们不还说了吗,你现在是五秒钟,所以中间还隔了一段时间,这个隔的比较明显,那假如说。我们是两秒钟,一秒钟就输出一次呢。那如果说前后两次窗口之间的这个数据甚至有可能会受到乱序数据的影响呢?这个时候又该怎么办呢?啊,那我们自然就想到了URL count里边不是有那个窗口对应的信息吗?那我是可以判断你当前这个数据到底属于哪个窗口的呀,所以我后边干脆就在。按照窗口去做一个分组。
07:02
按照窗口当前的那个不是有Windows start和window and吗?我随便找一个对吧,按照window and做一个分组,分组之后就是当前同一个窗口收集到的所有统计数据,然后呢,哎,我基于这个收集到的所有统计数据。定一个定时器,稍微的等一会儿,他就全到齐了,全到齐了之后,那是不是我直接就把所有的数据做一个排序输出前两名就完了啊,这就是我们基本的一个想法啊,那这里就又涉及到另外一个问题,就是说。既然是要稍微等一会儿,那就需要设一个定时器,其实原理跟窗口是一样的,窗口是按照窗口结束时间设的定时器嘛,那我们现在呢,当前它的window and不是本身我们之前窗口结束触发它输出的那个时间点吗?我们就在这个window and的基础上。再多等一点点,多等一毫秒就够了啊,为什么呢?因为我们都是water mark出发嘛,这个跟乱序数据没关,只要是water mark超过了window and加一毫秒的数据的话,诶,那我们现在就可以保证所有的数据都到齐了,这是第一个考量点,第二个考量点是那当前在这个后面做处理的时候。
08:18
我的这个数据又怎么收集起来呢?你又不用窗口了,窗口的话,我们说全窗口函数就直接把它收集在里边了嘛,有一个table类型,那现在你既然没有了,那怎么办呢。我们应该对后边的这个算子,这里的这个算子单独定义一个一个列表,把所有的数据保存进来,那你定义这个列表,大家知道后边我们这个做计算的时候,它本身这个任务应该是运行在本地的,我们刚才不是针对那个window做了一个分组吗?你如果要是就是在本地创建一个a list的话,Java是不会看你当前这一个key是什么的,只要是一个并行任务,我执行在本地,那所有的数据是不是都会看到这个list呀?
09:05
那他就不分K了。哎,那这怎么办呢?啊,这就要给大家提到一个我们后边接触到的概念,我们要创建。状态。而且这个状态呢。可以只跟当前的key有关系,所以这个状态叫key state。也就是跟键有关的按键分组的状态。我们自定义了这样一个状态,当然是一个列表状态了啊,就是有一个列表来一个数据就把它塞进去,来一个数据就把它塞进去,而且呢,创建一个按照窗口结束时间加一毫秒之后触发的定时器,等到触发定时器的时候,确保所有数据都到齐了,我们就从这个状态列表里边把它提取出来,然后做一个排序输出,这就是我们完整的流程。啊,所以大家会看到就是这个过程还是稍微有一点麻烦的啊,又用到了后边我们还没有讲到的,没有详细讲到的状态的用法啊,所以这个就当是先做一个预热吧,就用一个比较复杂的例子,把所有东西都串起来。
10:13
那既然有了基本的思路,那接下来我们看一看到底应该怎么处理啊,那既然是既要用到定时器,又要有状态自定义状态,那什么样的操作符合这个需求呢?当然就是大招,当然就是process function式啊,就是对于这个自定义状态,之前我们说一般的这个reach function也是可以搞定的,但是对于这个定时器而言,那就只有process function,而且是必须得是key的process function式啊,所以接下来我们就要做一个分组,然后自定义一个key的process function。Ul countt stream,然后一个KBY,当前我们K的是。是当前的window信息啊,我们基于这个window and啊,因为这个比较重要,后面我们还要基于它去定义定时器吗?啊就把这个就列举出来就可以了啊,然后接下来就是直接一个process了。
11:09
一边要去拗一个我们自己啊,Key的process function啊,这个我就直接叫做。Top nes result,但是这里边我们既然说top n,那大家会发现其实可以有一个参数,就是这个N应该是可选的,那比方说我这里边传一个二,那就表示N取这个TOP2,我们把它这个print啊放在这里,那接下来我们关键就是实现这个东西了。直线自定义的。Process。啊,这个实现的过程呢,其实也并不复杂,At class。我们先把这个copy一下啊。现在既然是key的process function嘛,那我们需要继承key process function。
12:02
它的泛型KIO,当前的K,当前的K是window and,你得根据当前的来啊,当前是window and,所以这是长着,然后后边是这个输入,输入当然是UR count了。好的,这个po类,然后最后输出还是string,我们还是刚才那样啊,实话的把它这个在控制台打印输出,就像一个监控屏一样,对不对啊,监控大屏不停的看这个,呃,每五秒钟,每五秒钟输出一次啊啊,然后接下来既然你这里边有一个这个二嘛,那我们应该得有一个属性啊,所以这里边我们定义一个属性。N把它定义成private对吧,直接把它定义成N。然后接下来,那这个就需要有对应的构造方法了。我们把这个这个N给一个构造方法,这样的话上面就不报错了。接下来我们知道。必须实现一个process element方法,然后这个process element方法里边呢,其实是每来一个数据,我是要先把它扔到一个list列表里面去,哎,那所以这里面是涉及到了一个所谓的状态,这样一个list列表我们是作为一个状态来保存起来的,这里涉及到状态的应用,这个状态怎么用呢?啊,我们先在外面做一个定义。
13:23
定义。列表状态。这个所谓的列表状态还是private定义出来啊,定义一个list state。这个东西就叫做list state,你既然它是个list嘛,那我总得知道这个列表里边具体的元素是什么类型啊,啊,所以有泛型,我们当前就是把每一个元素保存进来啊,那也就不用客气了,直接URL count就可以了,好,所以接下来我们自己定义一个啊,叫做UR count list state,那是不是定义了之后这里就能直接用了呢?诶,大家看到这里边我们没有直接把它拗出来是吧?诶那这个如果要是创建的话,是不是应该就是直接new一个Lisa state就完了呢。
14:05
注意不是啊,你有一个,那我们new的是一个本地的Java对象啊,所以这里边你定义出来之后也是一个Java对象,现在你得用flink把它管理起来,因为我们不光是本地的一个操作啊,本地内存里边的一个东西,你还要让它根据不同的K能够保存不同的列表啊,保存不同的状态啊,这是弗link可以直接帮我们搞定的。但是你如果要是想让flink帮我们把它管理起来的话,就必须哎,环境中就是运行时环境中。获取状态的控制距离。啊,那所以这里边创建状态时得怎么创建呢?大家记得process function,它本身也是负函数嘛,那么在这个负函数里边。有一个方法叫at runtime context,可以获取当前的这个运行上下文,然后就可以获取里边的状态了,诶所以根据那个方法之前大家看到的啊,运营上下文里边获取状态的方法,就可以让flink管理起来,然后我们从环境里边获取这个状态就可以了,但是你直接在这儿让它等于然后去去get啊,看你这么去get,我们现在get一个list state可以这么去写。
15:20
这个不对,为什么呢?现在的这一个类啊,Static这个类大家知道你在这儿声明的时候,定义的时候。当前还没有runtime context对不对,那你这样去去解析这个代码的时候,它解析不出来啊,哪有这个运行时呢?没有,那怎么办呢?你必须得等当前任务跑起来之后,整个这个任务启动才能有运行时。那任务怎么办呢?难道我得到process element里面去取吗?不需要,我们有生命周期方法,我们只要取一次就对了,不需要每次都去取吗?所以里边我们在open生命周期里边获取状态啊,所以是这样去做的这个操作啊。
16:00
Ul you can'list sit,这我们可以去at run time contact,然后get list sit这边大家看要传的啊,是一个。List state script,那这个稍微有点奇怪,只要大概知道它是什么东西,它是个描述器,所以这是固定的用法啊,它的语法就是这样的,然后里边呢,要传一个当前的名字name啊,那比方说我就叫URL count list。然后后边还需要给一个类型,因为弗link得知道啊,你得告诉flink这类型是什么,名字是什么,才能找到对应的状态嘛,那这里边的话,当然类型就用types定义了,我们现在这个types是个URL will count,所以这是个types,欧洲里边把URL view count class在这,我们就把这个状态声明出来了。哎,那反而后边的这个做法就非常的简单了,后边我们要做的就是来了数据之后。将铸具保存到状态中。
17:05
那这里面我们要做的就是URL,大家看这个状态,它有一个at方法啊,那当然就是可以就像我们那个list的操作一样,把它at进来完事了吗?哎,另外还有一步操作,诶,注册一个定时器,注册window and结束时间。加一毫秒的定时地啊,那这个注册也非常简单,调用时间服务,定时服务register time,然后这里边我们要的是结束时间,结束时间从哪去取呢?UR count里边不是有吗?啊,这个也可以啊,或者我们知道上下文里边也有,因为我们KBY之后KBY就用的是这个window and嘛,诶,所以这个其实从K里边直接可以拿得到啊。Ctx减二的K当前的这个值加一,这就是加上一毫米。但这个没完,我们的核心其实是等到复发值器的时候才会有用开r on开R这个方法里边才是我们真正要去做计算的这个过程啊,那这个过程的话,其实大家也知道了,肯定就是要创建一个飞的类似的这个过程,对吧,这个我就详细去写了啊,我们直接从这边汇过来就可以了,类似的这个做法,我们可以直接创建一个a list。
18:23
当然了,这里面我们不需要是元组类型啊,不需要是temp,我们还是定义这个UI view count就可以了。后面我们提取的时候也是直接用这个字段就可以嘛,所以这个比较简单啊啊,那这里边比方说我们定义这个不要叫result了,我们把这个叫做URL啊release吧,就叫。那后面这也没有他map里边操作那么麻烦,里边关键是要把这一个状态里边的东西拿出来,然后转成这个a list啊,那所以这里边啊,最简单的方式看这个状态本身这个URL welcome list state啊,本身这个list state是一个me state边有一个able,是有这样的一个迭代的类型的,只不过你要找到这个able稍微麻烦一点啊,你看它这这里边就是只是提供了这些接口方法啊,这个我们讲到状态的时候可以再说,这里只要知道怎么用就行了,我就直接做一个便利。
19:19
这里面的每一个URL count。是可以直接。案例有一个点的方法,得到的就是这个out,这个out是什么呢?前面大家其实看到过。那就是这个类型对吧,大家看me state这里边有一个out类型啊,所以它的out是一个interable类型,我们这里的这个方法可以直接把对应的这一个able出来啊,那既然你get的出来了,可以便利,那我们就把对应的这个list里边依次添加进去,URL will count。诶,那后边这里就可以做sort了,URL list做一个sort,那这里边不对,对吧,我们还是重新来new一个啊又一个competor。
20:08
里边还后减前,那应该是O2点对吧,应该是这个value减去Oe.on.inter value h去啊,那后边这个构建前两名,这个过程就完全类似,直接凹这个之前的。这里的这个处理过程大家看,这不是已经有对应的那个结果了吗?我们接下来就直接stream builder,好,这接下来我们就是。包装信息啊,应输出后面这里是排序,前面是从那个状态里边提取数据。提取到这个release的主要就是为了排序嘛啊,你直接那个状态,他没有给我们提供这个排序的接口啊,所以这个是没办法做的,这边只要稍微的更改一下就可以,比方说这里边我们的这个。
21:00
这个叫ctx。也就不是window了啊,Ctx里边本来就没有window,我们当前的这个时间戳窗口结束时间应该是Ctx.current啊,这样的话就可以直接拿到,然后接下来。取前两个的话,我们拿到的是这个啊,URL这个list,那这里面类型就不对了,我们要的是。肉类URL count,后边这里也是直接去提取URL以及count就完事了。我们把这个就要搞定了,然后运行一下看看效果。当前五秒钟一个窗口,前面我们还把这个数据打印出来了,数据比较少,所以它都是一嘛,然后现在数据多了,大家看3242啊,跟之前的那个状态是一样的,五秒钟一个窗口,五秒钟一个窗口,把当前的访问量最大的啊,这是URL了啊URL,然后打印输出。这就是我们呃用这种方法的一个测试,它的好处在于我们前面都是按照K去做了分组,做的是并行计算。
22:08
所以这就是靠N优化之后的效果。
我来说两句