00:00
我们已经了解了窗口函数当中的增量聚合函数,那这里边就包含了规约函数以及更加一般化的聚合函数A方式。对于增量聚合函数而言,我们会发现啊,它非常符合我们流处理的思路,那就是来一个数据就处理一个,只不过呢,我们当前的窗口算子,它并不是时接输出,而是等到窗口结束时间到来的时候才会触发计算,然后输出结果。那事实上我们知道啊,当前所谓的触发计算其实对于那个aggregate function而言,就是要调用get result方法,这个时候得到结果才输出,那中间呢,中间其实也并不是不做计算,每来一个数据就会调用aggregate function里边的A方法去进行一个叠加,诶,所以我们之前所有的数据啊,在到来的时候就已经在窗口里边把该做的计算都做完了啊,那所以到最后想要输出的时候呢,只要把最终的结果拿出来输出就可以了,所以很显然这种方式效率会比较高啊,那接下来呢,我们就来介绍另外一大类。
01:11
窗口函数,那就是所谓的全窗口函数。全窗口函数顾名思义,它就是跟我们前面讲的增量聚合不一样啊,它就是所有的数据先攒起来,每来一个数据我们放在窗口里边,并不做计算,先缓存起来,等到窗口到了结束时间的时候,要去触发计算输出结果了,这个时候才真正意义上的一个一个拿出来,开始累加,开始计算,最后得到结果输出啊,所以我们看这个明显这就是一个批处理的思路了啊,那整体来讲的话,全窗口函数肯定从效率上是不如增量聚合函数的。哎,那我们就会想,为什么还要有这样一个全窗口函数呢?诶,那其实我们会想到在有一些场景下啊,我们每来一个数据就进行计算,可能是没有什么意义的,诶比如说我们可能输出的并不是一个平均数,或者说统计当前的个数,或者说求和这样一个可以简单累加的一种需求,诶那假如说我们需要的指标是统计当前所有数据当中的某个百分位,诶比方说我要统计这个50%中位数是多少?
02:24
诶,那我们自然就想到了,要这样的话,我们每来一段数据,你得到一个中位数好像也没什么意义啊,那所以说在有一些场景下,可能我们就是需要得到全量数据之后才能够去进行计算和统计啊,那这个时候呢,可能我们就需要使用全窗口函数了,另外还有就是像前面我们介绍到的增量聚合函数里边不论是。Reduce function还是aggate function?我们会发现在这个过程当中呢,我们只能定义最多啊,定义一个不同类型的中间聚合状态,然后定义最终想要的输出是什么样,在这个过程当中呢,并没有办法去获取当前运行时上下文里面的一些东西。
03:06
另外呢,如果说我们想把当前窗口有关的信息想要输出的话,诶,比如说前面我们输出的这个结果就很枯燥嘛,就根本看不出来这到底是什么时间段的一个统计结果,我想知道到底是哪十秒钟统计出来,这个平均数是一个2.5,那这就要包含窗口信息。所以这些信息如果我们想拿到的话,那就都需要用到另外的全窗口函数才能获取更多的信息。那在flink当中呢,全窗口函数同样也是分成了两种,一种就叫做window function,另外一种叫做process window function啊,所以接下来我们就分别来做一个讲解。首先是window function window function我们看字面上直接翻译它就叫窗口函数,呃,我们说做K之后啊,然后首先是开窗,点window里边传一个窗口分配器,然后接下来呢,呃,就是要实现一个窗口函数,那跟这里的窗口函数有什么区别呢?呃,注意这里所说的其实是我们在底层代码里边实现的一个接口,名字本身就叫做window function。
04:15
所以这里其实是一个狭义上的窗口函数啊,呃,主要就是因为在早期的版本,老版本里边通用的窗口函数,其实就是这个东西,最早的时候就指定义了这样1WINDOW function啊,所以我们整个这一步操作啊,就叫做指定一个window function,指定它的具体的处理流程,那后来呢,又逐渐的扩展出了各种各样不同的窗口函数啊,又增量聚合函数reduce function式方式,另外还有process度方式。那所以我们可以看一看在代码里面到底是怎么样去调用的啊,这个调用呢,其实也非常的简单,我们可以另外创建一个object来进行一个测试,我们要测试的是全窗口函数,所以for。Window。
05:00
Function test。没方法先写出来啊,那前面的流程还是一样的,所以我们可以直接copy一下之前在a function test里边所实现的这些代码啊,当然了,后面的这些操作我们就可以先不做操作啊,直接先复制前面的创建执行环境和读取数据源以及。提取时间戳生成watermark啊,那这里我们需要把下划线做一个引入,后边方便做影视转换,然后接下来呢,我们就可以。测试全窗口函数。那么这个测试的过程,首先我们还是做一个K备啊,那我们现在呢,干脆举一个例子,就是还是使用之前我们做PVUV的这个统计吧,啊,那现在呢,也不用那么麻烦了,因为我们不需要去单独的再指定更加复杂的中间聚合状态,我们只要把所有的数据收集在一起,然后做一个计算就好了,那我们就不用再除了,我们只要算一下UV,做一个去重就好了,所以接下来呢,诶,我们就是直接实现的需求是。
06:04
统计UV。那同样,首先我们这里要做一个K分组,然后才能开窗啊,那我们现在统计UV的话,显然还是所有的数据都要放到一起,然后根据user去做一个去重,所以呢,这里面我们所有数据都是同一个分组啊,那这种方式前面我们也做过了啊,可以直接指定一个。跟当前的数据无关的一个常量就可以了,之前我们是布尔类型的,现在随便给一个比方说,我们这个就叫做。K,一个字符串类型的常量,作为我们当前的K,那就所有数据都会分到同一组里面去啊,那接下来呢,我们再开一个窗口,点window啊,比方说现在我们还是简单一点,就来一个滚动窗口吧,Tumbling time windows.o我们简单一点就是十秒钟,那time.second我们把这个time要先引入。Import window in time.times second,十十秒统计一个滚动窗口,然后接下来哦,那我们就知道了,如果说想要进行全窗口函数的调用的话,那显然就不是reduce和aggregate了,那到底调用什么方法呢?呃,这里边首先我们说传统的最经典的这个window function啊,老版本里边的通用窗口函数接口,它的调用方法其实是一个。
07:25
点apply我们看到是这个。Apply我们知道有应用的意思吧,诶,所以我们看这个从字面上来讲的话,确实非常通用,就是要应用一个窗口函数,然后我们点进去的话就会发现。这里边它的实现最经典的实现就是直接传一个window function,然后这个window function点进去之后发现,哦,它其实是一个接口,是一个treat啊,就在skyla里边,接口是特征特性嘛,所以我们这里边就是要实现这样一个特性,那么它里边的泛型参数我们看就很多了,有四个,首先是in out,那这是输入的数据类型,以及最后我们统计计算之后的输出数据类型。
08:09
还有这里K的类型,因为我们现在是经过KY之后的嘛,当前建的信息也是可以获取到的,另外最后还有一个WW是指当前的window的数据类型啊,后面我们看这是有这个泛型的范围啊,必须得是window的子类型。然后下边呢,有唯一的一个抽象方法,这个抽象方法名字就叫做apply,然后里边我们看到它的参数也是有四个,首先一个是K啊,那就当前的这个键能够获取到,然后还有一个是window,当前的window信息全部都可以拿到,那前面我们不是说希望在输出的结果里边包装上当前window的信息吗?诶,那这个时候我们就可以直接从window这个参数里面去获取了。另外还有input,这当然了,我们看到现在的input是输入数据嘛,就是一个able类型,这是一个集合类型,放在一起了,说明是所有数据都收集齐了,才会调用我们这里的apply方法。
09:09
啊,那最后还有一个out out,我们看它是一个collect,这跟我们之前讲到的Fla map,或者是呃,在s function里边去发出数据的时候那种方式很类似,所以我们就想到了这里,如果说我们想要输出结果的话,那就调一次out.collect的方法,收集一次就可以发出相应的数据。所以整体来讲这个apply啊还是比较容易想到的,它其实就是所有的信息都能获取到,然后可以根据输入定义输出啊,这就是我们所说的这个一般化的窗口函数啊,那当然了,对于apply方法,Apply的这个接口而言,除了直接传一个window function之外,另外我们看到还有,哎,当然了,这里可以直接传一个拉姆达表达式啊,这是对应的这种实现啊,我们说单一抽象方法的接口啊,然后另外还可以怎么实现呢?哎,我们看它还可以前面传一个reduce function,后边传一个window function。
10:07
啊,当然这种方式要被弃用了啊,这是什么意思呢?这相当于是可以把增量聚合函数和全窗口函数结合在一起。啊,那就是相当于前面我们可以做一个增量聚合,来一个就处理一个,然后呢?啊,最后如果我们想得到的结果还要包装一些窗口信息的话,那后边再跟上一个window方式,所以这种方式其实还是非常灵活的啊,那它为什么被弃用了呢?其实如果我们仔细的去看一看之前的aggregate的话,就会发现啊,之前的aggregate它的调用也不仅仅是简单的传一个aggregate function就完了。它还有其他的调用方法,比如说它可以直接前面传一个a function,后边再跟上一个window function,跟上一个全窗口函数,这是要干什么呢?当然就是基于之前增量聚合的结果,在包装上窗口信息啊,就是我们想要的信息就可以有了。
11:05
那另外我们看到还有其他的一些实现,那后边同样可以传一个也是温格方式,只不过用拉姆达表达式来进行实现啊,单一抽象方法的接口啊,那另外还有一种实现,我们看传的就是另外一种全窗口函数,这个叫process window方式。那这里的process window function呢,也是一个全窗口函数,那它跟普通的window function又有什么区别呢?诶,我们点进去就可以看到啊,它是一个抽象类,然后我们看到它本身又继承自abstract reach方式,所以我们发现啊,它本身还是一个负函数类。那它就有复函数类里边的所有特性,我们记得函数类可以重写当前的生命周期方法open close,另外还可以捕获当前的运行时上下文啊,那所以在process window function里边,那就可以获取到更多的信息了。
12:01
然后在这个process运动方式里边呢,必须要实现的一个抽象方法就是process process这个方法,呃,我们知道它是处理的意思嘛啊,所以这里我们就叫做process window function,其实讲到后边会提到啊,所谓的process window function可以认为是。Process function里边的一元,哎,那所谓的process function处理函数其实就是我们整个link系统里边底层进行处理的API啊,这就是我们说的data API,如果再往底层的话,分层API嘛,再往底层的话,其实就是所谓的处理函数process方式。啊,这个我们会放在下一章进行讲解,那现在呢,我们看到这个process,它里边的处理过程也类似,我们看到这里拿到的就是一个K,注意第二个参数不一样了,之前window function的话拿到的是window,现在拿到的是context,啊,那这个context它又是什么东西呢?其实就是我们这里专门定义的一个上下文,它里边除了当前的窗口信息之外,有无影斗作为属性,另外还有诶,当前的。
13:11
处理时间当前的水位线,水位线我们知道就是当前的事件时间嘛,事件时间的进展嘛,另外还有当前的状态,Window state global state,最后还有一个output output是用来干什么的呢?这个是可以用来做分流操作,可以进行测输出流的输出,所以它的功能可以说就非常非常的强大啊。后边如果说我们想要捕获时间相关的信息,可想要做状态编程相关的内容,那都可以使用当前的process window方式。那后面还有两个参数,一个是elements,这就相当于之前我们那个input嘛,所有的数据输入收集起来,这是一个集合类型,另外还有一个out,用来做处理结果的输出,哎,所以整体来看的话,Process window function和window function基本上是差不多的用法,而且它的功能呢,又会更加的强大,所以我们说这里的这个window function啊,点apply的这种调用方式,其实现在用的就越来越少了,它的功能可以被process window function全覆盖啊,那可能在不久的将来,这个最基本的window function可能以后就要被弃用了啊,那现在我们重点需要了解的其实就是process读方式,这里我们就以process读方式为例,来实现这个统计UV的需求,那读方式。
14:35
如果要想去调用,想要去实现的话,它也同样是点apply吗?诶我们点进去看一眼apply这里边我们看到其实没有这种方式,第一种方式这就是温度方式,第二种方式是拉姆达表达式,另外呢,诶,其实就是前面结合了reduce方式的这种实现啊,后面是传这个拉姆达表达式,传两个拉姆达表达式的实现。它并没有传一个process运度function的这个接口,那哪个方法可以传process运动function呢?我们看就是它上面这个。
15:03
就是叫点process,这里传入的就是一个process window function啊,那当然了,就是如果说我们想传process window function程的话,也可以在aggregate后面传,前面我们看到了aggregate可以传两个参数。我们可以看到它可以前边给一个aggate function,后边给一个window function,当然可以给window function,那就可以给。Process温度方式,所以这两种调用其实都是可以的啊,那我们这里呢,就先来测试一个最基本的,那就是点process里边要去实现一个自己定义的process运动方式,我们就先把它在下边,还是做一个实现吧,Class,这是我们自定义实现的。实现process。Window function。我们要统计UV,那干脆就把它叫做UV count by window吧。
16:03
这样一个类,那当然了,它所要实现的extend,那就是process window function。泛型有四个,我们还记得,首先是input,当然是event类型。呃,我们把这个event要做一个引入。后边啊,接下来是输出的类型,输出的话,我们可以把它结合当前的窗口信息包装成一个元组类型也可以,诶,更加直观的,我们直接输出一句话,哎,告诉哪个窗口它的UV值到底是多少,诶,那这样的话我们输出就变成了一个string了。然后接下来还有当前key的类型,Key的类型的话,我们指定的就是一个string。最后还有window的类型啊,那当然直接写window也可以,但是呢,我们知道现在是一个时间窗口,我们可以获取它的起始时间和结束时间,这样的话就可以指定这个窗口到底是从什么时候到什么时候了嘛,所以接下来我们可以直接给一个time window。
17:04
我们知道本身时间窗口啊,Time window,它就是window的子类型啊,所以接下来里边我们必须要实现的是一个process方法。那这个方法里边,首先我们既然是要做驱虫嘛,那我们应该定义一个set,同样还是使用一个set。进行去虫操作。那这里面我们就定义成一个这个叫user set吧。直接使用scla给我们提供的set数据结构。Set string定义一个空集合,然后接下来那就是从element。中提取所有数据。依次。放入set中去重。诶,所以这个过程其实就是一个便利所有元素,然后把它添加到user set里边的一个过程,啊,那这个也特别的简单,我们知道集合类型SC里边,我们可以直接给它来一个for each。
18:09
要做一个操作,把它添加到user set就可以了嘛,哎,所以里边要做的操作就是user set,加等于添加下划线,点user这里我们可以看到报错了,哎,那是因为我们现在既然做了加等于操作,要把当前的这个user set做一个更改,哎,那很显然我们得到的这个新的set啊,更改了它的值,那么我们就得把它定义成一个变量,这样的话就没有问题了。将所有数据都添加进去之后,那最终这个set的size,它的个数其实就是我们当前的UV值啊,那所以我们也可以单独定义一下啊,那当前的UV值就等于user set.set然后现在我们还想再包装窗口的信息,哎,那我们就是提取窗口。信息包装string进行输出。
19:04
所以这里面的窗口信息。比方说我们需要一个window and它的结束时间,哎,这个用什么去提取呢?当然就是用当前的上下文去做一个提取了,context.window这里边我们可以拿到window信息,然后window里边呢,哎,我们还记得它有start有end嘛,我们就直接get end,拿到的就是结束时间,那当然了,我们也可以获取它的起始时间,同样是contact.window.get start就可以获取到了。接下来我们就可以直接输出调用out.collect输出一句话,那这里我们要输出的信息,比方说就是从什么时候到什么时候的一个窗口,然后它的UV值到底是多少啊,就是这样的一句话,那我们干脆做一个字符串差值吧,这个比较简单一点啊,啊,所以我们就是直接写窗口,然后后边首先是一个。
20:00
Window start。后面加一个波浪线,然后window and。从start到and这个时间段范围内,它的。UV值。为后边我们加上当前的UV,诶,这就是我们想要输出的内容。啊,所以上边的话,实线的这一个。Process window function传进来,然后接下来直接做一个打印输出就可以了,好,然后执行起来,我们就可以来测试一下,看看效果到底怎么样。运行起来。我们现在可能这个时间会稍微长一点,因为是十秒钟才有一个窗口的统计滚动窗口嘛,所以每十秒钟会输出一次,哎,我们可以看到当前十秒钟UV值是四。然后接下来我们继续等十秒钟啊,如果已经是四的话,那我们知道后面应该正常情况都是四啊啊,当前随机生成的话,十秒钟可能四个用户都会有,当然也有可能少一个的话,就有可能会得到一个三啊,这个就是看我们具体的这个随机生成数据的情况了啊,所以这就是关于全窗口函数的用法。
我来说两句