00:00
我们现在已经知道水位线到底是什么东西了,然后接下来呢,我们再来考虑不同的水位线的一个机制,这水位线听起来很简单,这不就是来了什么样的数据,按照它产生的那个时间戳,然后后面插一个这个标记不就完了吗?啊,随着它在这个数据流里面流动,没什么问题啊,哎,这其实是一种理想化的场景。就是数据就本身是按照他们产生的顺序已经排好队了,就是一个一个来啊,你看这里边就是二先来了,当然有可能三四秒没有产生数据,没有产品生成,那接下来就是五秒的数据就来了,然后七秒的九秒的生成的这个水位线呢,你也就直接按照当前数据的那个时间戳去生成就可以了,因为它肯定是。单调递增,一直往往前推进的,诶,所以这种情况下不需要做这更多的考虑,但是这种情况也有一个问题,在这种情况下,呃,首先你如果要是数据比较稀疏的话,那没什么好说的,你中间如果隔了一段时间没数据来,那确实我们不知道当前的这个时间推进到什么时候,事件时间始终是不能去进展的,没有水位线,但是如果说啊。
01:11
当前的数据特别特别的密,有可能一秒钟同时就来了好多好多事件,时间戳的话可以再精细,光是秒可以精细到毫秒,但即使是毫秒,同一毫秒可能也有好多好多数据,非常海量的数据来了啊,那这样的话大家就会发现同一个时间戳,你会发现有大量的数据来,每一个数据你如果在后面都要去判断一下当前的时间戳,然后去插入一个对应的水位线的话,这是不是就做了大量的无用功啊?数据量本身就已经够大了,然后你还每一个数据都要提取一下时间戳,然后插一个水位线,关键是你做了这个东西之后,还没用很多数据插入的watermark是一样大的,所以这种情况下你就干脆不要这么去干了,那怎么去干呢?我们就用另外一种方式,不要根据数据去触发,来一个数据就生成一个水位线,而是。
02:08
周期性的去产生水位线,所以大家就会看到实际应用的时候,如果要是数据比较稀疏的话,你周期性的产生这个不太好,因为你周期性产生有可能就打在没有数据的时间了,那这个时候相当于就是无用功,当前的这个水位线是不变的,但是既然是比较稀疏的时候,那现在系统也没什么压力啊,也没什么数据处理,那你无用功就无用功了,没影响啊,但是如果要是数据稠密的时候啊,这个是很要命的啊,所以说我们关键是要解决这个海量数据峰值处理的问题,所以。比较常见的,一般系统里边都是周期性的生成水位线,也就是说隔一段时间我去判断一下当前的时间,最近的一次是什么样的一个时间戳,我把它插进来,隔一段时间判断一下哦,当前最近一次是七,所以插入W7 W代表auto mark,然后呃,比方说下一个时间间隔判断哦,当前是11,最近一次是11,所以当前的时间就是11。
03:06
好,然后下一次是20,当前的watermark就是20,所以大家会发现这样插入的话。插入的water mark啊,尽管这里大家看到这个跳变比较大啊,但是如果数据量非常密集,我们的这个时间间隔又比较小的时候,其实这个watermark也是连续变化的,所以这样的话,呃,就可以起到我们在这个海量数据处理的时候的一个优化。这种情况呢,还比较简单,现在关键要考虑的是这是理想情况,为什么说理想情况呢?我们之前数据产生的时候,大家也已经知道了啊,如果说你是数据发生之后,就是按照这个顺序,只有一个单一的分区啊,消息队列把它收集起来,然后单独的一个算子读取,然后单独的一个分区算子去做处理,那这个没毛病,数据顺序是一直保持着的。现在肯定就是先发生的事件,先进flink,然后先被处理,这按照我们这种处理方式就一点毛病都没有。
04:04
但是事实上弗link是一个分布式的系统,而且它数据本身在之前采集的时候啊,前前面我们那个日志做收集,做这个ETL的时候,在卡夫卡那边,它本身也是一个分布式的数据顺序已经打乱了,就是按照它的发生时间已经打乱了,那这个时候又该怎么办呢?那这种时候我们看一下它的场景就变成什么样子了。大家看到就变成这样了,时间戳是2599秒发生的,数据来了之后,有可能七秒的数据才来,诶,那这个时候又该怎么办呢?我们还是先简单一点啊,不要周期性去生成,我们还是先考虑一个数据来了之后,我们就在后边去生成一个水平线,那你说九秒的这个数据来了之后,那是不是相相当于事件时间推进到九秒了呀,那七秒的数据来了之后,难道现在是事件时间腿滚到七秒吗?哎,这个显然不对,时间是不能倒退的,时间不能倒退,那既然已经之前收到了九秒的数据,那自然我认为事件时间已经进展到九秒了,九秒的那个产品已经下线了,已经生产出来了,那自然生产线上的时间已经到九秒了。
05:18
七秒钟后面才来,那是因为你路上耽搁时间多嘛,所以是七秒钟迟到了,而不是九秒钟提前下线了,所以我们自然是已经到九秒了,就不能再倒退了。所以我们现在的数据驱动的规则就变成什么了呢?就是每来一个数据就提取时间戳,但是插入水位线的时候呢,要判断一下当前新的最新的这个数据时间戳是否比之前最大的时间戳要大。如果大的话,时间才推进,才去朝前进展,如果小,那不好意思,你是一个迟到,数据对事件时间的进展没有贡献。啊,所以大家就会看到,在判断的时候,这就相当于我得保存一个状态了,不光跟当前的数据有关,还要判断一下之前最大的那个时间戳是什么给保存下来,所以大家看现在就变成这样了。
06:15
第二秒的数据来了之后,奥马是二五秒的数据来了,比之前的二大,现在进展到五九秒的来了,进展到九七秒来了之后呢。他不如之前最大的时间戳九秒,所以七秒来了之后什么都不干,八秒九秒来了之后也是什么都不干,那11秒的数据来了之后发现,哦,它比之前大了,这个时候才进展变成当前的沃马事件,时间推进变成11秒。所以大家这样看的话,水位线还是一个单调递增的过程,保证我们的时间是向前进展,永不回退,时光不能倒流嘛。啊,但是这里就又有另外一个问题,就是如果说我们直接这么去处理,每来一个数据就判断的话,这不又回到之前那种海量数据同时来的时候的问题了吗?诶,所以现在我们就来一个周期性的生成,周期性生成也比较简单,我们不是说已经可以保存一个之前最大的那个时间戳吗?那就是每来一条数据之后,我还是要去保存一下,比对一下,保存当前的最大时间戳,只不过。
07:22
不去直接生成水位线,等到周期性触发的时候,隔一段时间要触发的时候,把当前最大的那个时间戳提出来,然后输出当前的这一个水位线啊,这这就是当前我们看到的啊,在这这个时间间隔要输出当前最大的九秒,所以输出一个九,然后下一次时间间隔啊,刚好就是最新的这个数据啊,14,那就输出一个14,然后下一次间隔哦,最大的是22,输出一个22。这是关于在出现乱序的数据里边插入水位线的一个基本的原则。但是进一步考虑的话,这还是有问题的,假如就是零到九秒是一个窗口,既然你的水位线已经到九了,那我们这个窗口是不是就应该关了呀?
08:09
零到九大家看零包括零秒,不包括九秒。既然已经收到了这个水位线,那现在当前这个窗口就该关闭了,那如果关闭之后。因为是乱序数据嘛,那后边是不是还有可能有八秒的数据来呀,窗口都已经关了,那你这个即使想处理都没得处理了。这个数据就会被丢失,之前我们窗口计算的结果也会出现偏差。哎,所以大家就会想到,那这个东西怎么办呢?诶,我怎么样去处理这个真正意义上的这种迟到的数据呢。如果要类比的话,我们还是可以类比之前单车的例子,现在我们那个商品啊,不是按照自己生产的那个时间戳,按照那个顺序来的,它是路上耽搁的时间,有快有慢啊,有的是拿手推车送过来的就比较慢啊,有的是这个电动车送过来的比较快,所以说他赶车的时候时间就到了前面啊,所以九秒钟的这个商品它来得快,我们现在时间已经推进到九秒了,那现在要不要直接就发车呢?
09:11
如果我想保证后边还有迟到的数据能赶上这班车不要错过的话,那就必须要再多等一会儿。啊,其实这个很好理解,我们生活当中不就是这样吗?呃,一个团队出去团建,那肯定希望每个人都不能落下。呃,有人路上堵车了,没准时到车上,那我们就多等一会儿嘛,提前联系好。呃,这个我大概十分钟就到,那我们等十分钟把人凑齐了,这个数据都收集对了,然后再去统一发车统一走。那现在对于计算机系统而言,这个数据怎么能通知到我们,他他能迟到多久,我们要去,呃,等多久呢?啊,这个就没有办法了,所以这个就只能是我们经验性的去给一个延迟的时间,或者说我们可以先去看一看整体的这个数据的分布,它的这个乱序到底能乱到什么程度,什么意思呢?就是说前面来了一个九秒,后边可能会来九秒之前的数据,那那到底最早能来多早的数据呢?
10:12
大家看这个后面最多就来到了七秒的数据,不会再来六秒和五秒之前的数据了,所以这样的话,这里的这个乱序程度,我们说它就是就是二。九秒之后,最多来了两秒前的数据。那所以按照这个标准的话,我们是不是只要等两秒就可以了呀,诶所以这里这就相当于是我们一个可以等待两秒的一个时间,当然整条流看的话,你不能只看这一个乱序的地方,你比方说看后边。这里14跟12又有一个乱序啊,那当然这个乱序也是两秒,还有什么呢?那那整个这条流是不是直接按这个两秒乱序就搞定了呢?也不是的啊,你得看整条流里边我统计一下这个规律,最大的乱序能乱到什么程度,比方说22后边来了17。啊,这个乱序,这就到了五秒,五秒之前的数据在他后面才来,所以这里边至少要等多久啊,应该还要等五秒对不对,因为你这里边如果只等两秒的话,那就意思是说九秒钟来了之后,我还要继续等,等到11秒的时候,我才去关闭当前的这个九秒钟的窗口,哎,那所以大家就会想到了,那你22来的时候,这相当于时间已经进展到20了。
11:25
如果17秒有一个窗口要关闭的话,它是看到19秒的数据的时候。这就相当于已经等待了两秒了,那就相当于要把这个17秒的窗口就要关闭了,所以如果看到前面20秒的这个数据,那不就直接把这个窗口关了吗?等两秒不够用对吧?所以要等的一定是这个最大乱序的这个程度,你等五秒的话就没有问题。我要等五秒,是不是就是17秒的窗口应该要等到22来了之后才能够关啊,那所以这个时候就是22来了之后,就可以把这个17秒的窗口关掉,17秒本身是不存在于这个窗口里边的,所以没事儿对吧?啊,只不过就是17秒之前的数据肯定不会再来了啊,那这样的话,我们把这个窗口关了就没事了。
12:10
所以我们这里边提出了一个延迟触发的一个机制,比方说这里简单说就是等两秒的一个策略,之前我们说是等两秒,那到底现在的时间是多少呢?我们之前说了,在计算机系统里边,你设置一个零到九秒的窗口。那正常来讲的话,我们就是看时间嘛,并不是说窗口本身要等两秒,那是窗口自己的这个行为了,我们现在是说时间,时间的话,那你自然就是说只要到了九秒钟,我只要看到时间进展到九秒,我就应该关这个窗口,那假如说我现在时间就希望它整体等待,整体推后的话,那是不是应该我看到九秒钟的这个数据的时候,时间就不应该进展到九秒啊。按照我们之前的那个想法是现在时间还是进展到九秒,只不过呢,你窗口是要等到相当于等到11秒的时候才去关这个九秒钟的窗口啊,但是这个这种行为,这看起来就有点儿怪啊,就好像不是九秒钟关窗了,所以我们现在是要求还是九秒钟关窗。
13:16
但是怎么样呢,但是时间变慢了,哎,所以大家看到现在的这个规则就变成了。我是两秒钟来的时候,现在它的水位线插入的不是两秒钟的水位线,而是。两秒之前的水位线啊,所以大家看这个其实效果是一样的,就好像我们那个看表的时候啊,一个是司机那边知道我现在已经确实是九点钟了,但是呢,我要等你十分钟,我等到09:10再开车。另外一种是啊,我已经知道我要等你十分钟了,我直接就把我的表调慢十分钟,我还是九点钟直接发车,这两种情况其实是等价的啊,只不过就是说直接把它调慢的话,我们当前的这一个时间就还是时间,你的窗口行为可以完全不变,还是该几点钟关几点钟关,这样就完全一致了。
14:10
大家看到两秒来了之后,现在的时间只进展到零秒。减二嘛,五秒数据来了之后呢,进展到三秒,九秒来了之后关不关九秒钟的窗口呢,当然不关,因为时间只进展到了七秒。那什么时候关闭零到九秒的窗口呢?啊,等到11来了之后,现在的水位线插入的减两秒是九秒了,那我就可以把零到九秒的窗口真正的关闭了。而且大家也可以看到,目前九秒之前的数据七八都到期了啊,后面再也不会出现九秒之前的数据了,当然九秒的数据还会有,没关系啊,九秒数据我们不包括啊,这个就无所谓了啊。那另外还有就是我们其实本身这种策略并不能处理所有的情况,这里在插入的时候,22之后,这个已经时间是进展到20秒了,那你后面还会来17,那明显这个就就滞后的有点太多了,这个乱序它比这个要大,所以说17如果要是想要正确分配窗口的话,很有可能就这个数据就会丢,比方说零到九秒之后,下一个窗口有可能是九到18秒。
15:19
固定时间长度嘛,九秒钟一个窗口,九到18秒,那是不是看到18秒的water mark的时候,这个时候就会关掉了,哎,但是17秒的数据是不是之后才会来呀?哎,所以这个数据就被丢了,那怎么样能正确处理呢?哎,那就是把这个等待的时间再调大,调成等五秒就可以处理了。这就是关于这个乱序流里边水位线的生成的机制,那我们可以总结一下水位线的特性啊,水位线的特性是什么呢?它是插入到数据流里的一个标记,可以认为它就是一个特殊的数据。然后另外呢,水位线本身是一个时间戳啊,就是它主要内容就是一个时间戳,就是用来表示当前事件时间的进展啊,那其实就是从那个数据里边那个时间戳里面提取出来的嘛,基本上是一回事,单独把它弄出来,就是因为它可以方便的在任务之间传递,只是当前的事件时间进展。
16:16
然后呢,它必须单调递增,保证时间不能回退,时光不能倒流,它还可以设置一个延迟,来保证正确处理乱序数据。那最终我们可以明确一下啊,一个水位线automark t,它表示的是什么?表示的就是当前事件时间已经达到T这个时间了,那么在他之前的所有数据。都到齐了。在之后就不会出现时间戳小于等于T的所有数据了。水位线是弗林里边保证结果处理正确的核心机制,就是视线时间语义下有可能数据乱序,那乱序靠什么保证呢?水位线设一个延迟,然后就可以把它正确搞定了,它往往会跟窗口一起配合。
17:07
正确的处理乱序数据,这就是关于水位线的概念,什么是水位线?怎么样去把这个水位线生成出来?它的特性是什么样?
我来说两句