00:00
大家已经知道这个water mark是为什么要引入这个概念啊,而且也知道了它是什么东西,我们知道它其实就是用来指示当前的事件时间的,而且呢,呃,他一般情况是可以在我们当前已经进来的所有数据的那个时间戳基础上引入一个延迟,对吧?啊,这是这个watermark的一个特点,我们一般就是这么来用它的,那watermark到底是什么呢?那就是这个watermark在底层实现的过程当中,流处理的过程,实现的过程当中,这到底是个什么玩意儿呢?哎,这里边我们给大家讲一讲这个watermark的特点啊啊,那大家看,在底层来讲,Water mark其实就是一条特殊的数据记录,也就是说大家在这个流处理的过程当中,大家看啊,呃,这个数这个方块这个啊,这个这个就是我们的数据,一个一个数据来了,然后这个原型的呢,就是所以说我们引入了事件时间。
01:00
这个产生watermark之后呢,相当于就会把这个watermark呀,直接插入到当前我们的数据流里边去,就好像它是一条特殊的数据一样,跟在我们的数据流里边,跟在后面去做处理,而它的这个,呃,当前的这个water mark里边包含什么信息呢?它里边只有一个信息,就是当前的时间事件时间啊,所以大家可以认为它里边就是带带了一个时间戳对吧?绊示当前的这个时间到了几了,到几点了啊,那呃,大家就会看到我们当前这个事例里边,本身数据里边是不是应该是要带着一个时间戳啊对吧?本身数据有时间戳,然后呢,Water本身自己也有一个时间戳,这个数据的时间戳和watermark的时间戳又有什么关系呢?诶,这里边我们就要跟大家就就就想到这样一个一个呃方式了,对吧,我们的这个沃mark肯定得跟当前的数据时间戳得有一定的关系,对吧,他就是基于当前已经收到的数据的时间戳来判断当前的时间达到多少,那我现在要想去判断这个时间,那其实是有两个要求,一个就是说时间是不是必须要只能朝前进,不能往后退啊,对吧?哎,所以这里边watermark里边的这个数值啊,必须是单调递增的,不能减小,因为它表示的时间嘛,时间不能后退,然后另外还有一个特点,就是我基于当前已经收到的数据,它的时间戳来判断当前的时间,那我还想引入一个延迟,对不对啊,我不想,比方说我收到这个五秒的数据了,你看这里边简单的这种这种方式的话,假如我随便插的话啊,你看这个。
02:54
随变定义的话,那这里边一后面我我插了一个这个是二的这个water,那就相当于什么,我这里面就表示我当前的时间进行到二的对吧,就当前的事事件时间even time已经是二了,也就是两秒之前的数据都到齐了,然后两秒的窗口可以关闭了,啊就是这样的一个意思,那如果说三后面我插了一个五的话,那就是表示什么意思呢?就是表示诶我当前的这个事件时间已经进展到五了,五之前的数据都到齐了,五秒的窗口可以关闭,在这个例子让大家看到,诶实这样你插的是对的对吧?确实这个二插进来之后,二之前的数都已经到齐了,后面没有二之前的数了,五插进来之后呢,呃,这个确实是没有,没有这个呃,就是五之前的数,再来了对吧,后面不会有乱续的数据了,这个看起来是对的,没有问题,但是大家可能就会有一个疑惑,就是那我怎么能知道后面我这里边来了个三,然后后面我就差一个五对吧,我就说五之前的数都到。
03:54
题了,你怎么知道来了一个三之后,后面不会再来一个四呢?你这个怎么判断它到底,呃,就是怎么确定当前的这个,呃,这个这个时间戳当前这个时间啊,到底给几呢。
04:07
所以这里边有一个比较简单的实现方式,这个实现方式是什么呢?还要满足就是我们这里边单调递增对吧?呃,不能变小,另外呢,还要引入一个延迟,就是能够等到之前所有的数据都都到齐,那这里边我的这个机制就是什么呢?就是我取之前已经到达的所有数据里边时间戳的最大值,哎,那大家想我要取所有数据已经到达的这个时间戳最大值,那是不是就保持这个最大值永远是单调递增,最起码是保持持平对吧?保持持平,然后这个时间就不进不进展嘛,至少它不会往回退,哎,那这里边我就直接取所有已经到达数据里边时间的那个最大值,但你如果要是说直接以这个最大值作为一个当前的这个事线时间的话,那这个不合适啊,对吧,你如果直接拿的话,那五这个数据来了之后,那当前的你。
05:08
比当前的这个最大值,最大值就是五嘛,那当前的这个watermark就应该是五了吗?后面就直接可以插一个五的这个water吗。不是这样的,不能这样做,我还要等一等,那这个等我到底该怎么等呢?我现在就是按照这个,呃,按照当前的这个数据来判断的呀,我并不知道他后面到底还会来几对吧,我只知道现在来了一个五啊,所以现在我们的标准是什么呢?我的时间就不要以当前的最大时间作为当前water mark这个时间,而是以什么呢?以当前最大值这个时间戳啊,减去一个固定的延迟作为当前的watermark时间。大家仔细体会一下这句话,它表示的含义是什么呢?它表示的含义就是我五这个时间带着五时间戳的这个数据来了之后啊,假如说我现在延迟两秒钟啊,大家看到这个三比它延迟了两秒钟,对吧?假如我现在延迟是两秒钟,那表示什么呢?就是五这个数据来了之后,当前最大的时间是五对吧?啊max。
06:16
这个是五,诶,那我现在呢,并不是当前时间进展到五,而是进展到了五减二减去这个延迟的时间,这才是当前进展的时间,所以我收到五这个数据之后,后边产生的这个watermark应该是三,对吧?所以这个三就表示什么呢?三之前的数都到齐了,如果有三秒钟要关的窗口可以关了,好大家想想这个对不对,那这个其实是对的,为什么?因为我们说那个窗口是左闭右右开,对吧?你如果是零到三秒的窗口的话,它是不包含三秒的数据的,所以后边你再来一个三秒的数据没问题啊,对吧?前面我们该关的都关了,关了就完事了,没问题。
07:03
哎,那有同学可能会说,那你前面这个三秒钟的窗口在这个时候关的话,难道说它是一和五这两条数据都会收到三秒钟这个窗口里来吗?当然不是了,我们之前说过当前的窗口是桶嘛,是分桶的操作,那他做的操作是什么呢?其实是来了一条数据之后,判断它的时间戳,然后分配到不同的桶里边去,哎,那你这里边如果要是零到三秒一个窗口,后边这个三到六秒一个窗口的话,那是不是一这条数据应该在这儿对吧?五这条数据是不是应该在这儿啊?那所以说到时候你关窗口的时候,关的是零到三秒这个窗口,它这里面是不是就只有一一个数据啊?啊,这个就不会出现错误。所以引入了这样的一套机制啊,那接下来我们处理这个water的时候就有据可循了,对吧?我就可以直接按照这种规则不停的在数据后面插入watermark只是当前的时间了,好啊,那这里边大家回过头来看我们之前这个乱序数据的话,那这个到底应该怎么做处理呢?哎,就是我们的数据是从这个145236这样来的,对吧?我这里边假如说零到五秒有一个窗口,然后后面是五到十秒有一个窗口,如果要做这样的两个窗口统计的话,那应该怎么做呢?我想要把这个该该有的数据都统计进来,对吧?那我就设置一个watermark的延迟时间,那watermark的延迟时间大家想我到底该给给几呢?我像那个就之前我们说那个process,呃,Processing time啊,就处理时间的时候,直接拍脑袋给一个一分钟给一个两分钟吗?那种不就不知道具体情况嘛,对吧,来了数据之后就乱了。
08:50
那么那那现在我我应该这样去考虑,我考虑的是什么呢?就是当前所有数据它的乱序程度有多大,我要看这个什么意思,什么叫乱序程度呢?那就是一后面来四,这个是正常的对吧?这是顺序嘛,四后面来五,这是正常的,五后边来二,这个就就反了,对不对啊,就先发生的数放到后边了,所以这个时候它是一个乱序的数据,那这个乱序的数据的程度有多大呢?看它俩之间的差值,五跟二之间差了三秒对吧?哎,那后面又来了三,大家知道五跟三之间也是乱序对吧?啊,但是他俩的这个差值是二,比前面我们这个三就就小了,所以我现在找这个乱序程度最大的这个差值,现在最大的差值是三秒,所以我就设置一个延迟三秒,生成这个watermark,那所以现在的这个效果就是什么呢?假如说。
09:50
定义了一个三秒钟延迟的生成的机制啊,那呃,第一个数据这个一来了,这个这个我就不说了啊,我们把这个完整的流程给大家画出来,那这里面应该要有一个对,有一个窗口是一个。
10:05
好,这边也有一个桶,这里面涉及到的数据比较少,我们就直接把这个,呃,就是这两个桶直接画出来就完了啊,这是零到五,后边这个是五到十。好,然后接下来每来一条数据之后,它的行为是什么呢?来了一之后,诶,大家一判断,按时间说判断对吧,丢到零到五这个桶里边来这个没没没问题对吧?啊,然后我们延迟三秒钟啊,延迟三秒钟,呃,这个延迟三秒钟的话,一如果要减三,这个就变成负二了,这个这个有点奇怪,我们先不考虑了啊,我们就当做反正没有任何的这个影响就好了,然后我们看后边四接着来,四来了之后,它是不是要放到零到五秒这个窗口里啊,对吧,这个桶里面放进来,然后四来了之后,当前的最大时间戳是不是四啊,最大时间戳基础上延迟三秒钟,哎,那是不是我这里边以这个三角表示这个water mark啊,后面插入的water mark,是不是就是四减三就等于一啊?等于一的含义是什么呢?就是表示一之前的数据都到齐了,我现在的事件时间是一。
11:21
一秒对吧?我的时间进展到一了,然后您如果有一秒钟要关闭的窗口可以关闭了啊,那有没有一秒钟要关闭的窗口呢?呃,没有对吧?没有呢,什么也不要做,然后后面又来了五,五来了之后最大时间戳现在是五减三,后边生成的watermark变成了二对吧?那它的含义是二之前的数都到齐了啊,然后有二秒两秒要关闭的窗口可以关闭了啊,所以接下来还是什么都不做对吧?五秒钟的这个数据还是,诶这注意五秒钟不在这儿啊,因为是零到五秒,不包括五秒对吧?五秒钟的数据就到了五到十秒这个窗口了,然后后面又是二又来了,二来了之后呢,大家会发现那这个最大的时间数没变对吧?没变,那是不是现在的时间还是二啊对吧?单调递增,它只是不减而已,对吧,你可以保持不变,那所以二就直接添加到我们这个零到五的这个桶里来,然后同样因为还是只是两。
12:21
等啊,每个窗口都都不关,没有窗口关闭,三同样来了之后,后边最大时间戳还是5WATER是不是还是二啊,然后把三再添加到这里来。这就是这样的一个处理过程,然后接下来六来了之后,诶,现在是不是时间又推移了,因为最大的时间初改了呀,啊,所以最大时间说是六减三之后,现在时间变成了三六这个数据呢,哎,放到了五到十秒的这个窗口里来,但是三秒钟没有要关闭的窗口啊,哎,所以说这个时候还是什么都不做,那到底什么时候才做这样的操作呢?所以现在我们标志其实要等到什么呢?大家想到了,等到后边,比方说我还可以再来一个四,对吧?啊,再来一个七。
13:10
再来一个八,可以可以有这样的一些数据来。所以大家会看到接下来如果来这三个数的话,那会怎么样,因为四跟六这个乱序是二嘛,对吧,这个乱序没有超过三,所以四来了之后没变对吧,当前的这个时间还是三,所以只是把四这个数据放到了,又放到这个桶里,又来了一个四,对吧,再统计一次。放到这儿啊,然后接下来来了一个七,当前的这个时间又往前推移了,减三之后变成了四,对吧?啊,七的这个数放到了这,但是四秒钟也没有关闭的窗口,那还是什么都不做,但是等到八来的时候,八同样还是放到这儿,但是八秒之后water mark就变成了八,减三是五,那大家注意五秒是不是有窗口要做关闭啊,这个窗口当前这个窗口就要关闭了,对吧?所以我们这里边也可以看到当前窗口里边14234这五个数,14234全收进来了,因为这里边我涨到五的话表示什么呢?表示五之前的数都到齐了,诶,那是不是接下来就不应该再来14234这这这样的数据了,所以当前窗口关闭所有的数据统计输出,这样的话就输出结果了,对吧。
14:37
把把窗口给关闭了啊,那有的同学说,那你这之后这个八之后,如果要是再来一个四,这是不是这个数就丢了呢?是呀,当然啊,你这里边直接把这个窗口已经关了吗?再来的数当然就丢了呀,那有同学就说,那不对啊,你这个如果丢了的话,这个正确性不是还是没保证吗?注意这就涉及到当时我们设置了一个延迟,延迟什么呢?延迟三秒钟,为什么延迟三秒钟呢?因为我们考虑到当前数据里边最大的乱序程度是三秒,对吧,你现在八秒之后又来了四秒的这个数据,这个乱序程度是不是比三秒已经大了呢?
15:15
啊,所以这说明你一开始没考虑周全嘛,你如果考虑到后面这种情况,那是不是这个延迟,你就应该给一个四秒的延迟啊,对吧,把这种情况考虑进去嘛,如果你设置了四秒延迟,你看八后面是不是这个直展到这个四秒对吧,那这个窗口还不关,还可以继续收进来啊,所以这里面大家有看到,为什么watermark可以让我们自己去平衡这个正确性和当前的这一个,呃,就是实时性呢?那就是说你其实主要看的是当前数据里边的最大乱序程度,你把这个当成延迟,这里边就可以处理所有的情况了。这就是这个water mark的特性。
我来说两句