00:00
接下来窗口的这一部分的最后,我们来介绍一下其他API,而在前面其实我们介绍窗口API的时候,大家会发现。主要这个操作是什么呢?啊,就是当然KY不一定是必必要的啊,也有可能不KY,那后面必要的其实就两个操作,一个是点window啊,或者是WINDOW2啊,去定义一个窗口分配器,指定数据应该进入到哪个窗口去,另外后边就是一个窗口函数,当然这个调用不一定是aggregate,这个最常见对吧,因为我后面可以把两者结合在一起用嘛,啊,所以这里边给大家举的例子是aggregate,你也可以直接就有一个reduce。啊,也可以直接sum对吧,这这些都是可以的,呃,那另外也可以直接点process,我们说点process就里边传的是一个全窗口函数,Process window方式,甚至现在不太常用的点apply你也可以去用啊,但是这些东西太多太杂,反而没有意义,所以最好用的就是这个aggregate里边把两者结合起来啊,传两个函数进来,自定义函数进来,那我们就用这种方式就好了,一个点GG里边传一个增量聚合函数,然后后面再来一个全窗口函数。
01:08
这是window最常见的用法,两个API,一个分配器,窗口分配器,一个窗口函数啊,那接下来我们要给大家说的这个其他API呢,是一些可选项,也就是说在你定义一个窗口操作,窗口算子的时候,未必要用到这些API,但是呢,呃,有时候你的特殊需求可能就得用到啊,就或者说是在有一些场景下,你不得不用这些API。那我们来看一下到底是什么。第一个。第一个介绍的可选API就是所谓的触发器trigger。啊,那触发器顾名思义了,它主要就是控制窗口,到底什么时候触发计算啊,就是我们说的啊,就是窗口,呃,就是它一般情况下是这个只收数据,然后一直在等着。
02:01
它是不往下发数据的,对吧,什么时候发数据呢?我们说就是如果是时间窗口的话,那就是到了那个结束时间到点了,到点发车嘛,那那那那就发出数据,那发出数据之前肯定是你要触发计算啊啊所以所谓的触发计算其实就是到了窗口的结束时间,或者是到了它的触发条件啊,比方说像这个技术窗口的话,那就是个数攒齐了,那我就触发计算,触发计算的时候调用的就是我们这里比方说全窗口函数这里的。Process方法对吧?啊,就是在这里去做一个调用,当然你如果要只是一个增量聚合函数的话,那当然调的就是这里的这个概率result了啊,这是对于。与触发计算的这个时机的解释那。接下来我们就说这个触发器,它到底是用来干什么的呢?它其实就是告诉我们什么时候触发。
03:00
好像就是说了半天就还是还是就是触发时机的问题,那比方说我们定义了一个时间窗口,那其实这个触发器的底层是什么呢。它其实就是给我们设置了一个,相当于设置了一个定时服务。呃,定在哪个时刻的服务呢?定在窗口结束时间的一个服务。他就要判断,如果现在的时间已经到了窗口结束时间的话,哎,那么我就要触发当前窗口的计算输出结果。啊,这就是trigger的一个作用。那之前也说过,就是对于技术窗口,它的底层是用global window实现的,那global window本身是没有定义任何计算操作的,那怎么怎么做呢?那就是再加一个trigger啊,就是自己把这个trigger定义出来,接下来就可以控制数量了嘛,啊到了多少的数量我就停止,就触发窗口的操作,Trigger这个方法里边传的是一个trigger这样的一个类的实现。
04:04
我们可以看一下啊。大家可以看到tri这个东西啊,我们看这个flink streaming Java下面的这个trigger真是一个抽象类,然后它里边有几个非常重要的方法,一个叫做on element。一个叫做on processing time。还有一个叫on time。还有一个叫can emerge,除了can,还有一个就是后面叫on emerge can emerge大家知道这个就是呃,看它能不能去合并,你看这里面默认给的就是一个false对吧,默认就是不合并啊,什么时候合并呢?我们说一般情况就是呃,对于这个会话窗口啊,会去会去有这样一个合并的需求啊,那这里边后面还有一个叫叫这个clear,还有一个方法叫clear clear很明显这就是做一些清理工作了,对吧?呃,窗口你最后要要怎么样去清理,那前面比较有趣的啊墨GE这个我们不考虑啊,前面比较有趣的就是三个on什么什么的方法。
05:03
这个on什么什么,指的是啥呢?大家应该还记得在前端页面里边,你如果要是点击一个按钮的话,一个button会有一个on click方法,那它指的就是要触发当前的这个事件,诶之前我们在讲到water的时候,在water test这里,这个out waterness大家还记得它是里边不是了这样的一个ator吗?还记得里边是不是有一个on event和on aauicit啊,它有这样的两个,按照什么去触发的这样的一个方法,现在也一样,现在类似,大家还记得之前one是什么意思吗?那不就是当前一个even的一个事件,就是一个数据嘛,一个数据来了之后,诶,那我就调用到这儿。被触发了,我就调一下这个方法,那如果要是on periodicit呢,那就是定时嘛,周期性的那个时间200毫秒一次,诶,到了200毫秒我就调一次这个方法,诶,所以它都是以某种事件某种外部调用啊,去回调回来的一个方法,所以有时候会管这个叫回调方法啊,啊,那大家会看一下当前的这个on element,这不就跟on差不多吗?
06:16
看这个模样就长差不多,哎,所以这个element就是当前到达的这个数据元素,那就是你来了一个数,诶,出发当前on element到底该干什么事?所以就是每接收当前窗口啊,设置这个trigger之后,每接收一个数据,我就调一下这个方法判断要不要关闭窗口。啊,然后还有一个是on processing time,那这个就很明显了,这里边的这个time关键是什么呢?是你看有没有定义了处理时间的定时器,也就是有没有处理时间的闹钟。假如说你定了有处理时间闹钟的话,那到这个时间了,我就得去出发一下,我就得看一看现在到底该干啥了。
07:00
然后还有一个是on even time,那这个就对应是事件时间嘛,假如说我有一个事件时间的闹钟的话,那到点的时候我们就来看一下调用这个方法,看看是不是要触发窗口计算啊,那关于这个定时器闹钟的这个概念呢?呃,其实后面我们讲到处理函数process function的时候,还会专门重点讲它,它是flink底层的一个时间服务啊,这种定时服务的一个特色啊,就是你可以定一个闹钟指定。之后的某个时间点,什么时候去触发一件什么事啊?当然时间既然有事件时间和处理时间两类,那么这里的闹钟当然也可以是基于事件时间,还是基于处理时间,也有两种。这个看起来好像稍微有点抽象啊啊,但是大家如果要是看一个具体的例子,可能就会对他。就会有更深刻的理解,比方说看一下这个count trigger,我们不是说那个count window本身是一个global window加了一个这个trigger吗?那它的trigger是怎么实现的呢?就是这个count trigger。
08:03
啊,这个can trigger,我们可以看一下它的on element,里边有具体的内容,On even time和on processing time。这里就。诶,大家看这里是要返回一个trigger result对吧?那这个trigger result是个什么东西呢?很简单,就是触发的结果。也就是说,触发结果就是当前窗口到底要不要计算,我们可以看一下trigger result到底有哪些。因为我发现它是个枚举类,枚举类型我们看一下里边有这么几种啊,Continue fire and pers fire,还有最后一个是per,大家知道fire是什么意思呢?Fire是啊火,它其实有开火的意思,那对于窗口来讲,Fire其实就是触发计算,就是要就是要把自己的计算结果要发射出去啊,所以就是他要做一次计算,然后把结果发送到下游。
09:00
那么P呢,我们之前说关窗总说关窗对吧,八点到9.1个窗口,我们说到九点的时候就窗口就关闭了,当时提过一句窗口的触发计算输出结果和关闭其实是两回事,哎,所以现在大家就看到了。触发计算输出结果,这是一回事,这个叫做fire。然后另外还有一个叫做PP大家知道有清理的意思,所以P就是把窗口的所有里边的东西状态啊,所有的内容全部清空,那然后窗口不就可以销毁,可以回收了吗?啊,所以这是真正意义上的关窗销毁窗口的操作。那当然了,大家就知道fire and p,那就是又计算又销毁掉窗口,把窗口清空,那另外还有一个continue continue,那就是。啊,那就是什么都不做嘛,所以大家看到了后面它其实是以什么来定义的呢。以两个书尔类型的属性来来定义了这四种不同的枚举类型,那大家知道两个字段两两一组合,那不就二二得四吗?哎,所以这两个字段一个就是判断要不要触发计算fire,另外一个就是判断要不要去清空,要不要真正关闭这个窗口,那所以如果说只是fire不破的话,那就是fire,大家看true false,如果是。
10:24
只破不fire的话,那就是。如果两个同时为true的话,那就是fire and per,如果两个同时为false的话,那就是continue。啊,所以现在我们就知道了,这个窗口到底是怎么去控制的,那你看一下这个count trigger是怎么做的呢。啊,那那就是跟时间没关系,不管你是事件时间还是处理时间,通通都是直接continue continue。哎,也就是说没关系,直接直接继续对吧,我任何动作都都不发生,直接继续,那这里边如果要是来了一个数据元素呢,诶,来了一个数据元素呢,我就得考虑考虑了,因为我跟个数有关嘛,它这里呢,Get protected state这里边它是保存了一个状态,其实大家能想到这个状态是用来干啥的呀。
11:13
肯定就是保存当前的count数量对不对啊,所以当前你看他在外边其实是定义了这样的一个状态,就叫做count对吧?啊,然后new了一个sum啊,然后呃,其实这个new sum,这就是它定义了一个直接聚合来了一个数据之后就直接聚合count嘛,直接加一不就完了吗?所以它定义的是一个reducing state啊,这会儿可能大家看不太懂这个啊,这跟状态编程有关,我我们可以先大概的知道这就是一个状态,它要叠加去聚合,所以在这儿先看一看现在这个到底是多少,然后给它加一对不对,当前的这个这个状态再再加一,如果我自己定义一个max count啊,那大大家知道这个max肯定就是我们从外面传过去的那个最大的范围了,如果大于等于的话。把这个count状态直接清空,然后接下来fire触发计算,这里是就直接return了啊,那如果说要是没到的话,没到就继续continue。
12:09
是这样的。然后对应的比方说even time trigger,这就是平常我们设置的事件时间语义下时间窗口。滚动窗口,滑动窗口,他们到底是怎么触发的,就在这儿呢?大家看到这里有element,哎,那不对呀,你这个时间应该只是靠时间出发,怎么跟元素还有关系呢?你看它干什么就知道了。大家可以看一下啊,它这里边说什么?如果窗口的最大时间戳还记得那个maxtime stamp吧?And减一能允许的最大时间戳小于等于当前的current water mark的话。那current watermark大家知道是事件时间吗?事件时间如果已经超过或者说已经达到了当前的窗口里边允许的最大时间戳的话,其实大家知道这就已经应该到了这个窗口结束关闭的时间了啊,所以他就直接fire。
13:03
啊,这主要就是考虑你不要说出现一个这个水位线已经很大了,但是之前还没有注册过这个定时器,大家看后面会有定时器对不对,它是考虑这些比较异常的情况啊,做了一个这个条件判断,那如果正常情况下你是一个一个数据来的,没出现这种情况的话,那怎么样呢?他注册一个事件时间的定时器。这也是后边这个底层API里边比较有特色的啊,就是跟定时器相关啊,大家先大概知道它有这么一个用法就可以了,它就按照当前的最大时间戳注册了一个定时器。什么叫定时器呢?就是一个闹钟啊,就是你等到那个时候就要去调一个方法。既然是事件时间,那调什么方法呢?哎,On even time嘛啊,就是所有的这个事件时间的闹钟都都会调到这儿来。所以说最终到了这个时间的时候,那么我就调到one time来,然后判断一下,诶是否现在time已经等于当前的这个最大的时间戳呢?是不是我前面注册的这个时间点呢?啊是的话fire,不是的话continue,同样这里也没有涉及到P啊,那P主要就是因为窗口算子里边要追加延迟时间啊,允许延迟时间。
14:19
这就是trigger相关的内容。呃,然后接下来另外还有一个叫移除器,移除器的话,这个比较简单艾,它其实就是移除某些数据啊,就是说比方说我当前窗口里边把所有数据要收集进来,我可以定义说哪些数据我不要做计算的时候,这个数据我就给它,相当于是个filter啊,就给它过滤掉了啊,这个是可以做到的啊,当然它有两个方法,就是一个叫evi before,另外一个叫evi after,意思就是说。Before啥意思呢啊,就是做那个窗口函数,不是就是最后触发窗口计算吗?啊,就是在触发计算之前,我就把它过滤掉了,你不要去参与计算了。而这个after呢,After是断过一次之后,我把它去除掉。
15:03
那接下来的第三个可选API就比较有意思了,这就是我们所说的窗口是允许延迟的。就是我们说的啊,在什么情况下,窗口是触发了计算之后还不关呢,那就是掉了这个方法点loud lateness,然后里边传一个时间的时候。这表示什么呢?一分钟嘛,这就表示我的窗口关闭的时间要往后延迟一分钟。所以这个意思就是说,比方说我定义了这个一小时的一个滚动窗口,哎,那八点到九点,假如我现在是事件时间,假如我water没有延迟的话啊,那按照我们的之前的那个说法,那就是water mark到了九点,那现在这个窗口就该关了。但现在有可能还有迟到数据,对不对,有有这个漏网之鱼啊,那那这个怎么办呢?那我就再多等一会儿。这个多等,这就是真正意义上的多等了,他要等到09:01的时候再真正的关窗。
16:05
所以如果对比我们之前的那个发车的例子的话,Wal mark本身可以延迟water mark是把表就调慢了。司机的表直接调慢了,而我们现在呢,表还是原来的表,只不过发车时间真的给你改到09:01了。啊,就之前那个沃马,比方说我们,呃,延迟两秒钟的话,那是司机偷偷的把自己的表播慢两秒钟,这样的话就是他看的时候,他说发车还是九点钟,只不过是他的表慢了,那现在呢,现在我们是wal mark,他的表已经到九点钟了,我们要算一次触发一次计算,但是窗口还不关,不要破。什么时候才破呢?继续等一分钟,等到09:01再破啊,这就是前面说的啊,窗口到底什么时候会去真正意义上的清理,它的清理时间在窗口算子里边,其实定义是就是当前的结束时间,再加上。
17:07
我们定义的lo lateness的这个时间,到了这个点的时候,星空。那可能就想到了,那你这个时间就算是给了一分钟,你怎么知道数据一分钟就都到齐了呢?万一还有这个特别漏网的漏网之鱼,一分钟还没来呢,迟的特别迟呢,那我多等五分钟十分钟,你还不来吗?呃,理论上是应该都来了,但是万一呢,总有万一对吧?呃,那所以这里边我们还有最后一个兜底的方法。兜底的方法就是测输出流,把迟到的数据放到侧输出流里面去啊,这个方法叫做side output later data。然后这里边我们调用方式是传一个所谓的输出标签output t啊,这里output tag有一个类型,然后有一个具体的ID啊,有一个这个标签表示late,然后掉了这个方法之后,那就是如果像之前我们这个啊,窗口真的已经关了,就连这个延迟时间都到了,都已经关了,还有数据来,那怎么办呢?那窗口都已经关了,真的处理不了了。
18:12
那接下来那就把它放到测数枢纽里边去,尽管我们没有办法把它直接在窗口的基础上直接做计算,但是呢,至少保证这个数据不丢。哎,那之后我们就还可以再继续合并啊,所以就能保证最终一致性啊,最终的结果是对的。那把这个放进来之后,这还没完啊,你这拿不出数据呢,怎么拿呢?那就是要等这个窗口都处理完了之后,大家看这不是窗口分配器窗口函数吗?把这个处理完了之后,然后再基于得到的data stream去调get side output方法传入当时定义的那个标签,这样就可以得到另外一条流,这条流就是迟到数据的测数出流。啊,这就是关于flink里边window API的其他API的用法,这里需要强调的是,除了最后这个get out,这是基于data stream做的操作,别的都是基于什么做的呢?看这个调用的时间都是在点window窗口分配器和窗口函数之间。
19:15
所以这些可选的API啊。Output aloud lateness,还有aviator,还有trigger,他们都是window stream的方法。所以他们必须夹在窗口分配器和窗口函数之间才是有效的。因为经过窗口函数计算之后,得到的就变成data stream了嘛,变成这个operator了嘛,算子了嘛,那你再基于它的话,那就没有窗口的那些操作了。这就是这一部分啊,先整体介绍一下。
我来说两句