00:00
前面已经讲完了窗口API window API里边最重要的两部分内容,那就是前面一个呃,窗口分配器window sign,后边是一个窗口函数定义它的计算规则,那这两部分我们都已经讲完了,呃,那这里边呢,还有一些窗口API,里边还有一些可选的API要给大家再单独的说一下,那就是列在这里面的,大家看前面两个,其实我们呃是比较熟悉了,前面见过一个叫trigger。它就是触发器,它可以定义当前这个window到底什么时候要关闭,然后还可以定义呢,到底什么时候触发计算输出结果,这里大家要注意啊,我在这里说的这个表述其实是把它分开讲了,我们可能会想到那窗口关闭和这个计算输出结果难道不是一回事吗?不就是比方说你这个八点到9.1个窗口,那就是到九点钟的时候,呃,计算结果输出,然后最后关闭窗口嘛,这不就是一一连串的顺序执行的这个事时间吗?啊,大家自然就想到了,那假如说我们后面还要处理这个迟到数据的话,那可能就稍微麻烦一点了,如果要处理迟到数据的话,你说当前这个窗口八点到九点到九点了,我是该关还是不该关呢?
01:19
你想要处理迟到数据,那是不是就后面还应该允许迟到数据来了之后再再做窗口里面聚合啊,那是不是这个时候不该关窗口,但是到了九点了,你如果不输出继续往后等下去的话,这是不是相当于我们延迟就太高了,哎,所以这里边有一个选择,就是我是不是可以到九点的时候先马上做一次计算,输出一个结果,然后到后边来了数据再去整合,再去改进啊。所以大家会发现,在这种场景下,是不是计算输出结果和关闭窗口,这是两回事啊。所以这是两种不同的操作,对吧?啊,那这个操作由谁来控制呢?Trigger trigger在这里边可以定义啊,它到底什么时候发生触发计算输出结果,什么时候做这个窗口的关闭。
02:09
那当然了,我们一般情况下是不会去做这个自定义的,对吧?啊,那就是在弗link的底层,对于这个时间窗口和技术窗口,还有绘画窗口啊,他们都已经是把这个trigger给我们实现出来了,所以它是可选嘛,一般情况我们不用去单独去定义。然后另外还有一个叫avi avior是移除器,移除器的话大家就想到了,它就是对有一些数据,如果我们做计算的时候不需要,对吧?啊,我前面可能都收集到了,但是现在不需要把它移除,有点像过滤的一个感觉啊,那么它的特点是你可以在我最后触发窗口计算之前把它滤掉,也可以是说我先做计算,计算完成了之后呢,再把它滤掉,就接下来如果在统计的时候就不要它了,对吧?诶就可以定义各种各样的规则啊。好,除了这两个之外,另外还有其他的一些API,那就是要去处理迟到数据了啊,大家看下一个,这个API叫做aloud lateness,字面意思是。
03:10
允许的一个延迟对吧,允许的一个延迟时间,这可以给大家在呃这个API里边再看一眼啊。大家知道这个既然是处理迟到数据,那肯定是跟这个时间相关了啊,所以我们把这个放在前面的这个时间窗口的这个测试里边,给大家简单的看一下,呃,其他可选API。可选API哦,那所以这里边我可以基于这个data stream啊,还是先做一个KBY.id对吧,然后点window,我随便给一个time second15,这是一个滚动时间窗口,然后我可以去定一个自定义的trigger对吧?啊,你如果定义trigger的话,那这里边是不是要去实现一个自己的trigger啊,啊对吧,这是这是这样一个东西啊,这个我就不详细去写了,把这个注掉,然后另外还可以有一个啊,就是avior对吧?这也是移除器啊,里边当然你就要实现一个自定义的这个avior了啊啊这这个就肯定就不详细去给大家说了,因为一般情况用到的这个场景是比较少的啊好,我们把这个对齐,然后把它注掉,哎,那这里面要给大家说的是可以做一个操作,叫alo the lateness。
04:36
大家看这里边传的是一个什么参数,就是一个时间,就是一个time对吧,延迟时间,那这里边这个延迟时间代表的意思是什么呢?比方说这里边我给一个一分钟延迟,大家想一下这代表什么意思?对,就是我允许当前处理的这个数据呢,等一分钟,就是在接下来的一分钟之内,如果说还有这个之前窗口的那个数据来的话,我就可以再把它收进来,再做一个统计,对吧?啊,就是可以让这个数据迟到,那他的这个处理的规则就相当于是在前边我到了这个窗口关闭时间的时候。
05:18
那大家想接下来我是不关闭窗口对不对,就前面我说了啊,那个呃,Trigger啊,那个触发器控制的时候,它本来到这个点是不是就应该八点到九点啊,这个九点钟到时间了,那现在就应该把这个窗口里边的那个计算结果输出啊,做一次计算输出结果,然后关闭窗口,现在如果我要是允许迟到的话。那接下来是怎么样?对,是不是到九点的时候,我就先输出计算,输出一个结果,但是不关闭窗口对吧?那那接下来怎么办?对,接下来继续等,等一分钟,也就是在9.01之前,9.01分之前,属于当前这个窗口的数据继续来,我还可以继续处理,而且他的行为是来一个。
06:09
来一个迟到的数据,我就直接在之前的这个窗口聚合结果基础上,直接做一次计算,叠加一次,所以大家就会发现了,就相当于我的这个窗口就变成什么样子了。快速的到九点的时候,就非常的直接输出一个结果,然后是不是后边在接下来的一分钟之内有可能会不断的更新这个结果啊,啊对吧,所以大家会想到这就是一个快速得到一个近似正确的结果,后边再去不断的更新调整,得到一个更加准确结果的过程啊,那大家想这是不是就相当于实现了一个。类似于之前我们讲的流处理第二代架构拉姆达架构的那种方式啊,对吧,你要同时保证快速和最后结果的正确性,那当然有同学可能想啊,那你这个如果说允许的这个,呃,迟到时间是一分钟的话,那到了09:01的时候会怎么样呢?那那个时候窗口就真的关了,对吧?哎,所以到那个时候窗口就会全部清空,状态全部清空,你再来数据的话,那就没办法聚合进去了,那有同学可能想,那这样的话,那不行啊,我不想让数据丢掉啊,最后我要结果绝对正确,那怎么办呢?
07:21
一种方法是你把这个再调大,大家想一分钟不行,我等十分钟对吧,十分钟不行,我等一个小时,但是这种方式不太推荐,大家想你如果要是这个窗口一直等待的话,那是不是相当于我中间的这个聚合状态窗口相关的那些数据都要存在内存里边啊,对我这个系统性能的占用是非常严重的,对吧?啊,对这个性性能的代价是很高的,所以实际应用的过程当中的这个不会无休止的等下去,一般就是给一个相对可以接受的时间就可以了。那假如说还有漏网之鱼怎么办呢?啊,那就是后边大家看到的这个叫做side output later data,把迟到的数据扔到另外一条侧输出流里面去。
08:06
啊,所以这里边我们就是下边啊,它还有一个兜底的方法,相当于是就是大部分的数据,可能这个迟,即使是迟到数据啊,我用这个一分钟的延迟也可以把它处理了,那如果有极个别的漏网之鱼,那怎么办呢?啊,那没关系啊,我暂时丢掉,呃,就是暂时也不是丢啊,就是暂时窗口就已经关了,聚合不进去了,但是我可以。那就是后边是不是可以再把它扔到那个测输出流里面去啊,对吧?然后这里边大家看要传的是一个output tag,这是一个测输出的标签对吧?所以这里边我要去new一下啊,New一个output tag啊,比方说你看我要输出的这个类型是不是跟当前的那个数据类型是一样的呀,原封不动输出对吧?然后里边要有一个标记,比方说这个我叫late对吧?啊,然后接下来还需要有这样的一个,就是这里边啊要有一个画括号啊,表示我当前把这个output tag直接创建出来。
09:05
然后后边大家可能会想到,就是中间有了这些之后,是不是后边我该怎么样处理,还是要怎么样处理啊,比方说我做一个sum对吧?呃,比方说这个some temperature。得到最后这边得到的这个结果result stream,诶那有同学可能,呃,这个result,我我这个叫就叫some stream吧,对吧,Some stream啊,那大家可能会想到那这个some,最后我怎么样能拿到它那个迟到的那个数据呢?这里边正常输出的是不是应该是我们窗口聚合的那个结果数据啊。那这里边迟到的数据是可以一分钟之内的,是可以在之前基础上叠加的,还在这个流里面输出对吧,但是这里面的这个set output的这个这不是测输出流了吗?那这个又怎么样去输出呢?哎,这个大家注意,它就是要在外边再去做一个,大家看get side out put做一个这样的操作,这里大家要注意,你前面呢,不要把它改成datatream,因为当前这个get set output这个方法就是single outputt stream operator里边的方法,大家看这个set out set output get set output对吧?就尽管我们当前它也是继承了data stream,但是data stream里面没这个方法,只有它才有啊,所以这个大家稍微注意一下啊,就必须要这里边写出来是single output stream operator。
10:36
好,那这里面get side output是不是还是要传一个对应的这个标签啊啊,那所以这里面我干脆啊,干脆在这里边我把这个标签定义出来完事对吧,就是我直接去拗一个把这个啊copy出来。把它直接放在外边。用这这种方式把它创建出来,哎,直接一个output t对吧,把它先定义出来,那这把这个output tag传进去,这就可以把这个数据直接获取到了,然后可以把对应的这个做一个打印输出,这是一个late对吧?好,那当然了,大家可能会想到那这个你只是做了一个打印输出,那怎么样跟之前的这个呃,结果再再合在一起呢?那大家想这是不是就是你单独相当于是个批处理了,因为这已经是两条流了,对吧?而且这个肯定延迟时间是很长了,所以那你就单独把这个数据收集起来,然后是不是再用另外一个程序把这两个数据做一个合并就完事了,所以大家看这就是这个flink API里边啊。
11:39
是不是用在窗口操作里边,用处理延迟数据的这种方式,是不是就实现了我们之前拉姆达架构里边的那个两套系统的功能啊,你那两套系统不是用一套流处理做一个快速的输出一个近似的结果,然后再用一个批处理器做一个最后统计正确结果嘛,啊我们这里面也是啊,对吧,你用一个流处理,这里边快速的到点就直接输出一个结果,最后呢?啊,你做批处理吧,测输出流拿拿到之后再输出一个结果,而且中间还有一个就是。
12:14
呃,更加更进一步的一个优化操作,就是我可以等待一段时间,在这一段时间之内,我是来一个迟到数据,就更新一次结果,更新一次结果对吧?啊,所以大家会看到我最后的结果就是八点到九点,到九点的时候准时我就很快的啊,几毫秒的延迟,马上就输出了一个近似正确的结果,然后在接下来的一分钟之内呢,只要有迟到数据,这个结果还在不停的变,不停的更新。那到一分钟之后,可以说这个结果就非常接近于最终正确了,那当然他可能还有一些漏网之鱼,那最最后我再用那个测试出流做一个整合,对吧,批处理整合,这就是我们说的这个flink里边对于乱序数数据啊,对于这样的一个呃,迟到数据的一种处理方式。
13:03
Window API里面就可以搞定这件事情。那另外这里面还有一个问题,有同学可能也已经想到了啊,你要这么说的话,那我们直接在这个代码里边这么测,能测得到这个迟到数据吗?大家要注意一下,就当前我们定义这个迟到数,什么叫迟到数据?诶,对,这大家可能就没考虑过这个问题,对吧,你说诶我当前八点到九点,那你说我开一个窗口,当前时间就是八点到九点呀,那你这个数据来了之后,那不是当前如果是八点到九点来的,那它就属于这个窗口,如果说你说它是迟到了,那它就属于九点到十点的窗口啊,你为什么说它是它属于八点到九点的窗口,但它是一个迟到数据呢?对,大家想这就涉及到所谓的时间语义的概念了,对吧?啊,大家就会想到你所谓的这个所谓的迟到是根据什么来判断的迟到呢?是不是你就不仅仅是判断他在我们系统里边做处理当前的这个系统时间啊,而是你要看当前这个数据,它产生的时间到底是什么时候产生的,对不对?那你说这个数据它产生的时间是八点到九点,比方说这个08:59,但是它到了我们这里边窗口处理的时候啊,09:01才来了,那你说它是不是一个迟到数据。
14:23
他到底应该属于哪个窗口。对,大家想我们一般关心的是不是它还是应该属于八点到九点这个窗口啊,所以大家想在这种时间语意下,这个当前处理这个迟到数据才是有意义的啊,所以大家要注意一下啊,这里边在这个allowlessness里边也有一句,也有一句话,大家看这个源码里边注释。设置这个lo lateness only valid,就是在什么情况下才有效,才是合法呢?Even time Windows只针对事件时间的窗口。这个事件事件又出现了,之前我们也看到在设置那个窗口分配器的时候,我们见到有tumbling even time Windows,对吧,Sliding time even time Windows,另外还有这个tumbling processing time Windows啊,Sliding processing Windows,那这个到底是什么呢?这就是我们所说的时间语义。
15:18
这是要在事件时间语义下才有效啊,这就是关于这个其他API的一个用法,那最后我们也可以来完整的做一个总结,大家可以看一下这张图,这张图就是所有的window API调用的这个方式啊,整体来讲是分成两类,一类就是基于KBY之后的k state,呃,K stream去开的窗口,那就是直接点window对吧?啊,简写就是time window抗window,那另外一类就是。不KY不分组,直接基于data STEM去开窗,那大家知道就是window or对不对,对吧?啊,那大家看到后面的这个调用的API是不是完全一样啊啊,所以这个我们就不详细给大家讲解了啊,基本上就是一样的东西,所以我们看一下这个KPI之后怎么做呢。
16:06
点window,然后后边是一堆可选API,后边这这一步这是必须要做的,大家看是reduce或者apply对吧,这些是调用那个窗口的计算窗口函数,所以中间的这两步啊,就是这个window和这个后面的这个聚合操作窗口的计算操作,这两步必须得有。在他俩中间有很多可选,比方说trigger艾viator对吧,另外还有呃,允许这个延迟,另外还有测试出流处理,呃,这个迟到数据,这些都是可选的,而另外呢,如果你这里边把这个迟到数据扔到了测输出流里边,大家注意要获取测输出流,是不是要等到到了外边才能去再去做这个get set output啊。对吧,就是做完窗口计算之后的那个流才能去get,而不是在之前之前就get对吧?啊,所以这个完整的调用过程是这样的。
17:04
回到这个源码里边,大家再捋一下这一个整个调用的数据类型的转变啊,KBY之后由一个data stream变成了一个。K stream对吧,然后k stream再去调window方法,得到的是一个大家看得到是一个window the stream,所以前面我们讲了那么多东西啊,大家看就是window stream,它并跟那个k stream和data stream都没有,都没有对应的那个继承关系,所以前面我们讲的一堆啊,你像这个aggregate,呃,Reduce对吧,呃,还有这个max mean some,所有的这些聚合操操作,包括apply process。他们全是基于window的stream去调用的,注意啊,跟之前基于k stream去调用就不一样了,对吧?所以它都是基于那个window的操作,所以我们发现里边你去发现它里边的那个状态在做聚合的时候,是不是都针对当前的window有效啊,而之前我们那个KBY之后是只针对当前K,对吧?我们现在是针对当前K,还针对当前window啊,这就是当前这个特点。
18:15
然后你进一步做完这个转换之后,大家看reduce之后得到的是什么。Single output stream operator对吧?啊,那或者那个呃,Mean或者mean之后是不是还是它啊,或者这个aggregate得到是不是也是它,呃,那apply全窗口函数得到是不是还是它啊?大家知道这是不是就是一个data stream是不是又回来了啊?所以大家总结一下的话,这又是我们那套规则啊。Data。经过K败之后。得到了一个kid stream。K stream再做一个开窗操作,得到了一个window stream,然后window stream再做一个聚合,或者说全窗口函数的一个应用操作,是不是就又回到了一个data stream啊,啊,这就是我们说出走半生归来仍是data stream,对吧,又绕一圈回来了。
19:12
当然还有另外一种绕法,怎么绕呢?它是不是可以不做这个K直接开窗啊,直接window or对吧?啊,这里边直接这个window or,那它得到的是一个什么东西呢。啊,他得到那个叫做all window的strip,然后同样这个是不是也可以执执行这个apply方法对吧?Reduce aggregate,然后又得到了一个data stream啊,所以它的这个数据类型转换就是这样的一个过程。可以给大家再看一眼啊,Data stream,对吧,这里边是不是window or啊,WINDOW2得到是一个什么or window stream对吧?然后它也是跟别的那个类型都都没什么关系啊,然后在这里边如果要是做一个reduce之后,是不是又得到了single output stream operator啊。
20:00
这就是一个数据转换过程啊,所以我们的window API完整的调用就全部讲完了。
我来说两句