00:00
现在我们已经了解了水位线的概念啊,那其实我们会发现啊,水位线本身它的定义是非常简单的,它其实里边的核心数据啊,就是一个整数,这个整数呢就是一个时间戳,它是从当前数据流里边数据本身自带的那个时间戳字段里边提取出来,用来表示当前事件时间的进展。啊,那这个看起来简单,但是在实际应用的过程当中呢,考虑到很多实际问题,我们又会有不同的场景,接下来我们就分别来进行讲解。首先我们先说最简单的场景,那就是有序流里边的水位线。这其实是一种比较理想的情况,什么意思呢?就是说我们当前的数据它到来的时候。顺序就是他们的生成时候的先后顺序,也就是说,时间戳小的数据先到来,时间戳大的数据永远都在后边。到来的时间顺序是完全保持先来后到这个原则的。
01:05
那所以呢,我们一个一个提取时间戳,提取出来的water mark本身也就是从小到大不断增长,单调递增的,诶那这样的话就符合我们对于时间的定义嘛,因为我们知道时间它是不能倒流的,哎,不能来回跳转啊,不能先变大后变小,这样是不行的,我们必须保证它单调递增,但是呢,在实际应用过程当中,如果数据量非常大的话,也就是说我们当前这个数据啊,来的可能会非常非常的密。那就会导致什么情况呢?就是可能很多数据它的时间戳。其实是一样的。如果我们以秒作为这个时间戳的单位的话,可能他们就完全一样,没有任何的区分,如果我们以毫秒作为时间戳的单位,他们可能会有所区分,但是呢,间隔的时间也会非常的小啊,所以这个时候每来一个数据,我们后面就差一个水位线,每来一个就差一个水位线,这个其实是要耗费大量的系统资源的,而且我们会发现它做的大部分都是无用功,因为你这里边插的有可能这个时间出都一样吧,其实是没有必要的。
02:13
哎,那所以为了提高效率,我们现在的策略就变成什么呢?哎,不要每来一个数据就在后边判断一次,插一个水平线,而是周期性的插入,就是间隔一段时间。在后边插入一个,间隔一段时间再插入一个,那当前插入的时候,这个水位线对应的时间戳以什么为标准呢?当然就是它之前最近一次接收到的这个数据里边的时间戳,这就表示当前时间进展到了什么时候。啊,所以我们看到现在的水位线,其实就是有序流里边周期性出现的一个时间标记。啊,这里需要注意啊,那插入的这个时间周期我们默认应该是多少呢?啊,在flink里边,它默认的这个时间周期是200毫秒,当然这个周期我们也可以设啊,如果说我们设置了这个100毫秒的话,那我们就是每隔100毫秒就判断一次,生成一个水位线,以之前接收到的最近一个数据的时间戳作为标准。
03:17
那这里的这个周期其实也是一个时间概念啊,你每隔100毫秒,200毫秒,这到底是以什么作为标准呢?是事件时间还是处理时间呢?诶注意这里我们是以处理时间作为标准的,也就是说系统时间隔100或者200毫秒之后,我们就生成一次水位线。如果我们要按照事件时间来判断的话,那我们知道事件时间本来就是靠着这个时间戳去推进的,你现在呢,又要等到这个推进200毫秒之后,我才去再次插入水位线啊,那这个显然就陷入死循环,永远达不到了啊啊,所以我们需要注意这一点。这种情况其实比较简单的啊,那这是理想情况,在实际场景下往往不是这样的。
04:02
实际场景是什么样的呢?啊,那就是我们说的啊,当前的这个数据是要进行并行处理的,我们当前这个窗口算子,哎,那可能上面这里有一个下面还有上面还有对吧,并行的好几个子任务,那它的上游呢。也是有好几个并行的子任务。所以给到当前这个子任务的数据,它有可能来自不同的分区。那如果来自不同的分区的话,我们就会发现了,当前在做这个并行处理的时候,哪一个处理的快,哪个处理的慢,这个是没准的,而且不同的分区在进行分布式的数据传输的时候,也可能延迟是不同的,所以就有可能出现什么事情呢?哎,就是在这里本来是第一秒就处理的数据,这里呢,处理了一个第二秒的数据,但是他们传递到下游任务的时候。第二秒的数据先来,第一秒的数据后道了,所以在这种情况下就出现了所谓的乱序啊,接下来我们就要考虑的问题是,出现乱序数据的时候,我们接下来怎么样去定义水位线呢?
05:10
那这里最直观的想法其实还是跟之前一样啊,诶,那我们就还是,呃,我们先不考虑那个周期性的情况啊,我们先来考虑,就是每来一条数据之后,我们就按照当前它的时间戳去做一个提取嘛,来了一个二,哎,那当前的时间戳就是两秒水位线,就给一个两秒钟的水位线插在后边。五秒钟的数据来了,那就差一个五秒钟的水位线,九秒钟的来了,哎,那就来一个九秒钟的水位线。这里有一个问题,就是九秒钟之后又来了一个七秒钟的数据,出现乱序数据了,这个时候怎么办呢?那后边我们就不要插入七秒钟的水位线了,为什么呢?因为前面九秒钟数据都已经来了,说明当前的时间已经进展到九秒了,我们已经插入了九秒的水位线,那这个时候七秒的数据来了之后,我们会发现它其实是一个迟到的数据。
06:03
啊,就是并不是说当前的时间才到七秒,而是说已经到九秒了,七秒这个数据你来晚了,所以我们当前的时间就不应该再退回到七,而是依然还是九,哎,所以我们这个策略其实可以非常的简单啊,就是插入这个水位线的时候呢,不要直接就按照之前最近一次接收到的数据来判断它的时间戳,而是做一个比较。保存之前最大的那个时间戳,也就是接收到的最新生成的数据啊,表示我们当前时间到底进展到了什么程度。所以使用这种思路的话,那么我们得到结果就变成了这样啊,那就是两秒钟数据来了啊,那当前最大的时间戳是二,那么我们插入的水平线就是W2,这个W表示watermark啊,在我们代码的底层,其实watermark本身就是一个类,那么它里边的构造方法呢,要传入的一个参数其实就是当前的时间戳啊,就是一个整数啊,所以我们当前可以就W2表示两秒钟时间戳这样一个watermark,然后五秒的数据来了之后呢?诶,那么就插入的是W5,九秒钟来了,插入W9,然后七秒钟数据来了之后呢?哎,那其实我们知道啊,要插入他插入的也是W9,这个时候干脆我们就不用插入了。
07:23
因为前面已经有W9了,那是不是朝下游传递的时候,下游任务就都已经知道现在时间到九秒了,哎,那就没有必要再去重复发送了,节省资源,后面到什么程度呢?呃,就是八秒九秒这个都没推进,我们当前的时间一直到11秒的数据来了,当前的时间戳又变大了,最大时间戳增大了,那么这个时候我们的时钟才去推进。当前发送一个新的水位线,这是W11。这就是我们当前的一个基本的处理思路。同样的,如果我们考虑到大量数据同时到来,这个时候处理效率会变低,那我们也是可以不要每来一个数据之后就判断一下当前的最大时间戳,而是周期性的判断一下当前的最大时间戳,哎,那这个时候就是。
08:13
隔一段时间啊,比方说系统默认200毫秒一次,我们在这里判断一下。当前已经来了四个数据,最大的时间戳是九,那么插入的watermark就是W9啊,然后又隔了200毫秒,我们判断一下,当前最大的时间戳是14,那么插入W14,再隔200毫秒,最大时间戳22,插入的就是W22。所以这种处理方式呢,我们也是比较容易理解的,但是这种方式会带来一个很大的问题,就是如果说我们基于当前的这个时间标准去处理窗口的话。那就有可能没有办法正确处理了,为什么呢?呃,首先我们看啊,假如说我们当前是94和22,那假如说我们来了一个。
09:01
我们现在要定义一个零到十秒钟的窗口。或者说我们更加简单一点,我们就定义一个零到九秒钟的窗口吧,那我们自然知道看到了WATERMARK99秒的水位线,那就表示当前的时间已经进展到了九秒钟,我们应该要关闭当前的窗口,把计算结果要输出出去了,但是我们发现因为当前数据有乱去,所以在九秒之后呢,还可能有迟到数据,诶,还又来了八秒钟的数据,这个时候八秒钟的数据它本来是属于我们这里零到九秒这个窗口的,但是现在呢?窗口已经关了,这个车已经发走了,那当然我们就赶不上这趟车了,那这个八秒钟的数据就只有被丢掉了,所以相当于这个数据我们就漏统计了。统计结果出现了偏差,诶,那怎么去解决这个问题呢?啊,那像这个我们平常赶车的时候,大家都有这样的经验啊,如果说哎,我们当前这个班车一定要等到所有需要坐这班车的人的话,一定要等到所有正确的人的话,那么我们可以让司机多等一会儿嘛,啊比方说哎,你是九点钟要发车,那当前该上车的人呢,还没到齐,那我就多等十分钟啊,多等一段时间之后,哎,那所有的人可能就都到齐了,这个时候我再去发车。
10:23
那同样的啊,在我们这个数据处理的例子里边也是一样啊,我们可以做一个延迟发车嘛,那这种延迟发车的操作呢,就相当于是比方说诶,我们可以这个延迟就不能延迟十分钟了啊,我们延迟两秒钟吧,要插入水位线的时候,我们判断当前最大的时间戳是九,诶那么我们延迟两秒钟,那就是九减二,我们就生成一个。七秒钟的水位线,它表示的含义是什么呢?其实就是表示我们当前的事件时间进展到了七秒,哎,所以我们看啊,水位线就是事件时中的进展衡量的标志,那有了这个策略,那接下来就比较简单了。
11:05
我们可以在第一个两秒钟数据到来的时候,我们延迟两秒的话,这个时候其实只是沃MARK0啊,这时间只进展到了零秒,然后呢,五秒钟数据来的时候减二,诶,那么就是water进展到了三秒,九秒钟数据来的时候呢,那么九减二就是7WATER mark就进展到了七秒,所以如果这个时候我们在周期性生成的话。那就是隔一段时间一判断当前的最大时间戳是九减两秒,当前的时间就是七,然后再一判断下一个生成奥马的时间点,当前最大的时间戳是14减二,现在进展就进展到了12啊,然后呢?呃,下一次最大的时间戳是22,我们生成的water就是20。但是如果我们仔细去思考的话,会发现啊,在这里其实是有一个问题的,就是在这里我们等两秒钟,其实并不能把所有的数据都正确的处理。
12:03
因为我们这里边等两秒,其实就是直接拍脑袋给出的一个等待的延迟时间啊,那这里边我们看到对于这个数据流而言啊,现在看起来好像还是正确的,为什么呢?因为当前插入了七秒的water rock之后,在之后其实就没有比七更小的时间戳的到来了啊,那对应的就相当于没有迟到数据了吧,插入了WATERMARK12之后,后面就没有比12更小的数据来了啊,那同样插入20之后,后面也没有比20更小的了,看起来是正确的,但是呢,这是跟我们插入的位置有关,那假如说。我们当前插入的这个周期刚好是卡在了这里。插入了一个water mark,那我们看前面最大的还是22,时间说是22减两秒之后生成的water mark还是20。20秒的马代表当前的事件时间已经进展到了20。那么假如说有。零到20秒的窗口显然现在就应该关闭了,但是我们看到在它之后还有17秒的数据,哎,那这个17秒的数据显然是应该放到零到20秒这个窗口里边去做处理统计的啊,现在这就是一个迟到数据,没有正确的处理,它被丢掉了。
13:18
啊,那所以我们就想到了,那这个我们到底应该怎么样才能让所有的数据都真正意义上的正确处理呢?啊,那其实就是要我们要等足够长的时间,让所有的乱序数据,迟到数据都到齐了之后,我们再去生成对应的watermark关闭窗口,这样的话就不会出错了啊那类比我们赶车的例子,我们也可以想到啊,那就是说,呃,我们说九点钟要发车了,这个时候。我们发现人没到齐,我们就等,哎,那到底等多长时间呢?等五分钟,等十分钟还是等20分钟呢?那其实是要看我们当前这个人到底隔多长时间啊,最慢的那个人多长时间之后能到达嘛,所以我们在处理数据的时候。
14:04
也是类似的思路,我们其实是要判断这个数据流里边的数据啊,它的最大乱序程度到底有多大?什么叫乱序程度呢?也就是说我们这里有一个九,它后边又来了一个比它时间小的七,这个就叫乱序数据,那么它俩的乱序程度是多少呢?九和七之间的差距是两秒,所以我们就说它的乱序程度是两秒。啊,那这个数据流里边的最大乱序程度是多少呢?其实我们看到啊,在这里边其实就是22后边又来了一个时期,它俩是乱序了,它们的差距是五秒钟。所以当前它的最大乱序程度是秒,如果我们设置一个延迟时间,就是最大乱序程度五秒的话,那很显然我们当前就可以等到所有的数据了啊,就是类比我们赶车的那个例子,就是我们判断清楚当前所有人最慢的那个他要等多长时间,然后我们等那个时间不就可以了吗?哎,所以如果我们现在等五秒的话,还是刚才那个例子,在这里22后面插一个watermark,我们发现等五秒的话,插入的就变成了W17 17秒的水位线插入进来啊,那然后后边呢,当然就没有比17更小的时间戳的数据到来了,那最小也是17秒的数据。
15:27
所以在这里我们也可以看到wal mark它的一个特性啊,它就是可以表示我们当前的事件时间进展到了这个时间戳的这个点,而且在这之后再也不可能有比他小的,也就是迟到的数据在到来了。那我们可以总结一下,看看watermark到底有哪些特性,哎,那总结起来的话,其实水位线它其实就是代表了当前的事件时间的时钟嘛,它就是我们当前的表,它其实本质上呢,是插入到数据流里的一个标记,可以认为是一个特殊的数据,所以我们每一个算子啊,每一个任务处理数据的时候呢,能接收到正常的数据,那就正常处理,也可以接收到watermark,那就处理时间相关的操作。
16:19
啊,那另外就是沃ma主要内容其实就是一个时间戳了,用来表示当前事件时间进展,这个时间戳呢,就是基于之前数据的时间戳生成的,注意它是基于数据的时间戳,并不完全就是数据的时间戳啊,因为我们看前面这里,我们有可能还要判断之前的最大时间戳要做比较,还有可能呢,在最大时间戳基础上还要做一个延迟处理啊,还要等一段时间,哎,那所以呢,我们可以可以定义一些其他的操作,另外。Water时间桌必须单调递增,保证我们的事件时间始终是向前推进的,时间不会倒流啊。另外呢,我们这里可以设置延迟时间,保证正确处理乱序数据。
17:05
最后我们总结一下,就是一个水位线到来的时候,Water mark t这个T是个整数啊,那就表示当前流里边事件时间已经达到了这个时间戳这个点T,这代表T之前的所有数据就都已经到齐了,那之后的流里边就不会在出现。小于等于T的数据了,当然这里我们看到啊,定义的是不会出现小于等于T这个时刻的数据了啊,这跟我们之前感觉的好像后边还可以出现相等的这个数据,这个是有所不同的啊,标准的底层定义是小于等于T的时间出来的数据就都不会到来了啊,这个主要是为了方便我们后边做这个窗口啊,以及其他的一些计算。因为从理论上来讲,假如时间已经进展到了某个时间点,比方说进展到了九点的话,那当然九点钟的数据就应该都到齐了,哎,那所以我们就是小于等于九点的数据就都不应该来了,九点的窗口就可以关闭了,水位线它是flink流处理里边保证结果正确性的一个核心的机制啊,所以呃,它其实是专门用来衡量事件时间进展的,所以呢,往往会跟窗口一起配合使用。
18:25
最后就可以保证我们在事件时间语义下正确的处理乱序数据啊,关于乱序数据的处理呢,我们会介绍完窗口操作之后再进行详细的讲解。
我来说两句