00:00
我们已经了解了窗口的具体使用过程,我们会发现主要就是两步,一步是定义一个窗口分配器啊,它规定了我们当前窗口的类型是什么,每一个数据来了之后到底分到哪个窗口里边去,然后呢,下一步就是一个窗口函数,那这个窗口函数就指定了我们当前处理数据进行统计计算的具体逻辑。那底层的数据结构转换的过程呢,可以用这张图来表示,诶,那我们知道前面做了一个点window操作指定窗口分配器的时候呢,得到的是一个window stream啊,或者说我们基于data stream直接开窗,那得到的就是一个all window stream,然后接下来我们再去调用各种各样的聚合方法,哎,可以是增量聚合,也可以是全窗口函数直接apply或者process那。应用了窗口函数之后,就又得到了一个data stream。其实呢,对于弗林而言,窗口的操作还不仅仅是这些。弗林还给我们提供了更多的API,诶,那就是有一些可选的API可以让我们更加灵活的控制窗口的行为,那这些API是怎么样去使用呢?呃,简单来说就是都是基于这里的window stream去调用。
01:14
哎,所以它就是可选的嘛,只要插入在我们定义的窗口分配器和窗口函数之间就可以了,所以接下来我们也可以在源码里边具体去看一下啊,Window这里边我们得到了window the stream,然后我们看到,呃,其实前面我们也已经提到过,除了下边的reduce aggregate process apply啊,以及max sum一些简单的啊,最基本的聚合计算,这些计算呢,我们指定的当然就都是一个聚合函数,一个窗口函数了,那除了这些之外,上边我们还有一些可选API,那就是这里的aloud lateness。Output data。Trigger以及avi,好,那接下来我们分别对他们做一个简单的介绍。
02:01
首先是trigger trigger就是触发器,那前面其实我们提到啊,所谓的触发器,它主要就是控制窗口,什么时候触发计算该算结果了啊,那本质上来讲,这个计算呢,就是执行我们的窗口函数,如果是aggregate的话,那当然执行的就是我们的get result方法啊,那如果说要是process function的话,执行的就是process方法。所以这里其实就是计算结果,然后输出的一个过程啊,那前面我们也我们也提到了。Link里边的count window技术窗口,它本身是没有具体实现的,底层是用global window来进行了一个实现,那全局窗口global window呢,本身没有触发器,它不定义窗口计算,所以我们就必须自定义一个trigger来控制它什么时候触发。具体在代码里面,如果要调用的话,也非常的简单啊,那就是直接。
03:00
点window指定了窗口分配器之后,得到了window stream,然后直接调点trigger方法就可以啊,那源码里面这里我们看的很明显啊,就是直接点trigger里边要传的就是一个trigger trigger本身就是一个抽象类啊,然后这个抽象类里边有哪些抽象方法呢?诶,我们可以看到有一个on element。有一个on processing time,有一个on even time,最后还有一个can emerge,然后还有一个on啊,当然了,这里can和on都已经有具体的实现了啊,啊,那所以上面的三个这是我们必须要实现的抽象方法啊,下面还有一个抽象方法,那就是clear,那clear尔的话肯定就是最后要执行的一个清除操作了,那前面的这几个方法到底是用来干什么的呢?啊,简单来说on element就是每来一个数据的时候,Element就是数据元素嘛,当前这个数据流里边来一个数据元素,到了我们刚才那个窗口算子了,那就会调用这里的element方法。
04:02
哎,那同样道理,On processing time指的是什么呢?诶,那就是每当在处理时间语义下定义的定时操作啊,就是按照时间定义的某个操作,比方说,呃,我们窗口触发,那其实就可以认为是一个定时操作啊,零到十秒的一个窗口,可以认为就是底层给我们定义了一个十秒钟要发生的事件,这就相当于一个闹钟一样,哎,那所以呢,这个on processing time就是假如你定了闹钟是一个处理时间的闹钟的话,那我就触发的时候就会调用这里的这个on processing time。那同样道理,如果要是事件时间的闹钟的话,我们调用的就是on time。那这里我们知道什么时候调用这几个方法了,那这几个方法到底怎么样能触发我们当前的计算呢?哎,这里的关键点在于它的返回值。是一个trigger result。Trigger result又是个什么东西呢?哎,我们可以点进去看一眼啊,这是一个枚举类型,这个trigger result就决定了我们当前窗口是否要进行触发计算,好,那所以我们看一下它枚举类型吗?有哪几个类型呢?主要就是这四个类型。
05:11
分别叫做continue fire and per fair和per,哎,所以我们看这个fire and per,这就是下面这两个的一个组合嘛。哎,那什么叫做fire呢?Fire我们知道有开火的意思,所以开火就是要真正的去计算当前窗口的值,然后发出计算的结果。哎,这就是fire的意思,那P呢,P是清除的意思,所以它其实就是要清空当前窗口的所有数据和状态,把当前的窗口就彻底的销毁关闭了,所以之前我们说啊,呃,我们可以认为当一个窗口到达结束时间的时候,零到十秒到十秒的时候,我们就触发窗口计算输出结果,然后把窗口关闭,我们当时认为这是一步,就相当于都是同时发生的。
06:04
现在我们就发现了,其实不是啊,那其实这是可以分开的啊,就是如果说我们定义了fire作为当前trigger result的话,那就是只触发一次当前的窗口计算,调用窗口函数进行计算,并且输出结果,但是不关闭窗口不会把窗口清空。而如果说我们只返回破的话,那就相当于窗口直接清空,什么都不做,直接把它干掉,销毁掉啊,那对应的上面还有一个fire and p,那我们就知道了,这就相当于是二和一了嘛,相当于就是。调用窗口函数,执行窗口计算,得到结果输出,并且销毁窗口进行关闭操作。啊,那上面还有一个continue continue我们就知道了,继续继续,就是什么都不做,相当于什么都没有发生啊,那所以我们就看到了trigger里边啊,每一个数据到来的时候,或者每一个时间到达的时候啊,那我们现在呢,就可以定义当前到底要执行什么样的操作,返回对应的一个trigger result就可以了。
07:10
当然这里边的底层我们看到啊,它其实通过两个属性来进行控制的,这两个属性一个就是fire,一个就是P啊,那我们就想到了,这里边就是fire和P都是一个布尔类型的值,那如果他俩都是true的话,当然就是要一起做嘛,那如果说都是false的话,就是什么都不做,那就是他俩这两个操作都不做,那如果某一个是处另外一个false的话,当然就是只执行对应的这个操作了。啊,这就是所谓的trigger的底层原理。啊,那当然了,我们可能会发现啊,这个逻辑如果我们想要去自己控制的话,还是比较麻烦的啊,那好在就是弗link已经帮我们提供了常见的所有的窗口类型啊,那如果我们想看它底层逻辑实现的话,我们可以直接看trigger所在的这个包下边,诶,它就在当前的window triggers下边。
08:00
然后我们就会看到啊,就会有count trigger以及even time trigger啊,Processing time trigger这些就都有啊,比如说这里我们看一下even trigger,它这里面是怎么实现的呢?诶,那我们看到啊on element方法里边它会做一件什么事情呢?他会首先去判断一下当前窗口的最大时间戳,我们还记得最大时间戳是什么呢?就是那个N减一哦,减一毫秒的那个值。他会判断一下当前的最大时间戳是否小于等于当前的水位线啊,也就是要因为我当前有上下文嘛,可以get current auto map获取当前的水位线,我判断一下水位线是否已经达到或者超过了窗口能够允许包含的最大时间戳呢?如果已经达到了,那么我们就直接发,直接触发计算。那如果没有,没有的话怎么办呢?没有,我们看它是注册了一个even time timer注册了一个事件时间的定时器,这就是我们说的到达某个时间要去做的某个操作啊,啊,那所以他就是注册这样一个定时操作啊,按照什么时间注册的呢?注意按照。
09:14
当前窗口的最大时间戳去定义的,哎,所以这个就解答了,前面我们做测试的时候啊,为什么是水位线9999的时候,我们直接就把零到十秒的窗口直接关闭了。因为零到十秒里边最大能包含的时间戳就是9999啊,所以它注册的这个定时器,它的时间就是9999,哎,那所以当前水平线涨到这个9999小于等于嘛,当然直接就除法计算了就发了。啊,那如果说我们当前是还没有到的话,他注册了之后会怎么样呢?会返回一个continue,继续什么都不要干啊,然后接下来最关键的是后边会有一个on even time啊,这就是如果说啊,前面我们这里水位线一直还没有超过我们当前最大的这个时间戳,这是数据来的时候调用的方法吗?如果在这里还没有超过的话,那么我们就一直等到水位线找到我们想要设置的那个最大时间戳那个点的时候,调用这里的on time。
10:19
啊,那这里我们就会判断一下当前的时间是否就是我们设定的那个最大的时间呢?哎,那如果是的话,那就直接fire,如果不是的话,那就continue。我们这里看到啊,它其实并没有定义去清除窗口,关闭窗口的那个操作,没有定义破,所以我们会发现本身link给我们定义的啊,这里边触发窗口计算的时候只是计算输出结果,并不关窗,那到底什么时候关窗呢?这跟另外一个设置,就是当前窗口允许的延迟时间,或者说等待时间有关系。啊,那另外还有一个on processing time,那我们就发现了,这就什么都不要做了,直接continue啊,那最后clear的话,那就是把注册的那个定时器直接删掉就可以了,做一个清除操作,这就是所谓的even time trigger的具体实现过程,通过这个例子我们就可以知道trigger触发器到底是要做什么事情。
11:16
这是关于触发器,然后接下来呢,我们再来简单的说一下其他的几个可选API,那还有一个是移除器avi aviator呢啊,那简单来讲就是要定义移除某些数据的逻辑。它跟触发器非常类似,也是基于Windows three去调一个艾方法就可以了,里边要传入的呢,就是一个。这样的一个实现类。所以啊,对应在我们这个源码里边,我们也可以看到啊,上面这个avior要传入的这个东西,它就是一个接口里边要实现的抽象方法,一个叫avior before,另外一个叫avi after,也就是说before指的就是说在窗口出发计算之前。
12:02
我们想要去执行这个,然后移除哪些数据啊,把一些数据就移除出去了,那after呢,当然就是窗口触发计算之后,我们可以再去移除一些数据,之前之后都可以删掉一些数据啊,这是艾,另外我们看到还有一个操作叫做。Lo lateness,哎,这就是我们所说的允许延迟,哎,那这个是什么意思呢?前面我们不是说,诶,当前我们这个处理的过程当中,这个water mark本身乱序流里边的延迟有可能不够大,有可能会导致有些数据没有收进来就丢掉了吗?诶,那所以这里窗口就给我们提供了另外的一种操作,我们可以继续多等一会儿,可以允许设置一个延迟时间。好,那这样的话,接下来的含义就代表是什么呢?就是本身我们在in time trier里面,这里已经看到了啊,到了当前窗口的最大时间的时候,窗口会触发计算,执行一次操作,然后输出结果,但是呢,不会关闭,什么时候关闭呢?那是要等到我们设置的这个延迟时间也已经过了之后才会关闭,那所以它真正的窗口关窗时间就是前面我们设置的那个窗口的最大时间戳max time,再加上这里的late,加上这里的延迟时间。
13:25
水位线,到这个时候我们才会关闭当前的窗口,所以这就给我们提供了一种处理迟到数据的方法啊,就可以保证窗口的正确性。然后另外还有一个叫做set output later data啊,那它是把当前的迟到数据输出到测输出流里边的一种方法,这又是什么意思呢?啊,它的含义其实就是说前面我们允许窗口等待迟到数据了啊,那窗口可以多等一会儿,但是我们想到窗口如果等待的话,这是要耗费系统资源的呀,就窗口不能关闭啊,那我们当前窗口可能就越开越多,窗口里面保持了很多数据,很多状态,这些我们都不能删,没有办法释放系统资源,那你这里等待的话,如果想保证这个数据结果的正确啊,处理结果的正确性,那到底等多久呢?多久是个头呢?你这里边就算等一分钟,也有可能还有迟到数据啊,所以我们上边这里设置的这个时间,同样也只能是保证我们绝大多数数据正确处理,那最后如果还有漏网之鱼的话。
14:29
那就要用这里的兜底的一个方法,那就是把这个迟到数据输出到测输出流里面去,注意它调用的方法,其实就相当于跟我们前面的窗口操作没关系了,意思就是说前面我们窗口都已经关了,到这个延迟时间到到达的时候啊,结束的时候窗口已经彻底关闭了,那这个时候窗口都没了,我们如果再来了迟到数据怎么办呢?诶,那就是我现在希望他至少不要丢掉。我把它塞到另外一个地方,这个地方叫做测输出流啊,那如果说我们还想把它再进行统计计算的话,那就把之前我们窗口统计出来的结果,再去追加当前测输出流里边的数据,进行手动计算就可以了啊,这就是关于窗口的可选API啊,那所以后边我们会介绍。
15:21
怎么样去真正在项目实践当中去处理迟到数据?那就要结合水位线处理乱序数据时候设置的延迟,以及这里窗口的可选API啊,那所以接下来我们就介绍这一部分内容。
我来说两句