00:00
我们讲到如果要是在这个处理的过程当中有乱序数据的话,那就会导致窗口最后的数据不正确,对吧?计算结果不正确,那怎么避免这个乱序数据呢?哎,我们就想你可以等一会儿嘛,对吧,延迟一会儿,等一会儿再发车嘛,诶你不要着急发车,所以这种延迟的机制其实就是在弗林克里边的设计,就是所谓的这个水杯线W马啊,当然这个本身这个翻译是有不同的翻译啊,就watermark有时有些文章可能会翻译成水印啊,就是一个一个标记,对吧,水印我个人是比较喜欢这个水位线这个翻译,大家可以想到你既然是流数据嘛,对吧?啊,在这个流里边,它的水位在不停的往上涨,你可以想到这样的一个状态,然后时间是随着随着这个水位上涨,时间在推移,这个好像更好理解一些,所以我是一般把它就叫水位线了啊,我们一般平常就就管它叫water rock就可以对吧,这个一说就知道是什么意思啊。
01:00
好,这里面有几个这个特点啊,大家要了解一下wal mark它是衡量even time进展的一个机制,所以它其实是设定了我们even time,是不是你不要像我们刚才这个这个例子啊,是不是就是说如果我们这里让他延迟发车的话,是不是就相当于1TIME,你不要一看到五这个数据来了,你就说五秒钟的窗口该关闭了,对不对?哎,你迟一会儿再关闭,所以它相当于是不是让我们even time的这个进展延后了一些啊。啊,所以它是衡量我们这个even time进展的一个机制,可以设置一个延迟,它主要就是用来处理乱序事件,那么正确处理乱序事件呢,就要用water mark结合window合在一起就可以把它正确的处理,增加一个延迟,延迟发车,对吧?然后接下来还有一个要求,就是说water mark,它表示什么含义呢?假如说我们遇到一个water mark的话。
02:05
它表明什么意思?表明时间戳小于这个watermark的数据都已经到了。所以我们说这是个水位对不对,比方说这个water mark已经涨到五秒的话,那我们就说明是不是水位涨到五秒了,那说明是不是五秒之前数据都到了五秒窗口该关了,对吧?啊,这是我们衡量这个窗口关闭操作的一个一个标准啊,啊所以大家会看到这个watermark它其实是做了一个什么操作啊,就给了程序这样的一个灵活性,就是可以让程序自己来平衡延迟和结果正确性,大家想想是不是这样对吧?我怕你现在有乱序数据,导致我结果不正确。那没关系啊,我可以可以等,我可以延迟发车嘛,多等你一会儿,那至于说你到底等多久呢?到底要多正确呢?对吧,那你就平衡吧,对吧,等的时间越久是不是越正确的,这个概率应该越大,你如果要是要想更低的延迟的话,那正确的概率是不是就越低啊,对,所以这就让我们可以根据自己具体的应用,就是更加精准的去调节这个过程了啊,所以这个延迟触发的机制就是我们所谓的这个water mark啊呃,接下来再给大家说一下这个water mark的这个特点,大家看一下这个图,这个图其实就看得非常明显。
03:37
Water mark,呃,这个跟我们前面讲到那个barrier有点像啊,所以就是wal mark其实也是被插入到数据流里边的一种特殊的数据结构。你可以认为它是一条特殊的数据啊,所以大家看其实就是相当于是这样,大家看一个数据来了,哎,下面这个是它的时间戳,对吧?来看带着一个时间戳是一,然后呢,后面我插了一个water是二,然后接下来呢,又给一个,呃,来了一个数据是时间戳是五,然后又来了一个时间戳是三,这里边我插了一个water mark是五。
04:12
大家想我这里边插这个waterlock是二表示什么呢?对二以前的都来了,然后插五的时候表示五以前的都来了,对不对啊,所以大家看它是这样的一个特点啊,呃,这里边autowa有两个要求啊,有两个基本的属性,或者说基本的要求,第一个就是必须要单调递增。啥意思,就是对,就是如果我们顺着这个流往后一直看的话,插进来的water mark水位是不是只能涨不能退啊,这就代表因为我们说watermark是衡量那个even time的一个标志,对不对,那你时间嘛,既然是时间,你你不能时间时光倒流啊啊,所以就是说只能涨不能跌啊,啊这这有点像房价是吧,只涨不跌啊,早晚要跌是吧?好啊,我们现在考虑的是water water永远不跌,好呃,接下来我们看这个还有另外一个特点,就是water mark呢,是要跟数据的时间戳相关啊,那么大家就会想到它表示的是不是就是在五五秒的这个water mark来了,是不是就表是时间戳,在五秒之前的时间戳,那些的数据都已经到了呀,对吧?所以它俩是息息相关,Watermark跟时间戳息息相关,好,那接下来我们再给大家看一看。
05:35
就是它既然是像一个特殊的数据一样,在这个在这个流里边去流动,那整个这个过程当中,怎么样去在这个任务之间传递呢?大家想一想。因为大家知道,大家看这个图就就知道啊,我一个task一个任务,它是不是。呃,就是之前给他输入的那些任务,可以有并行的好几个任务给他输入,对不对,假如之前并行度是四,那是不是就他就应该有四个输入啊,相当于然后他之后的这个任务,它的下游的任务发出去的这个任务是不是也可能有多个啊。
06:18
所以很有可能大家看这个分区是有多个的,所以呢,我们当前这一个任务,它可能有很多个输入分区,同时还有很多个输出分区,对不对,呃,这这只是把这个放在一个一个那个。就是大家看这是单独抽出一个task,如果要并行的几个task,是不是就出现那种交错的分区的那种状态了,对吧,数据就来回来回要做重分区啊啊这里我们只提出一个task来看一眼。之前是。大家看这个例子里边,它有几个输入分区呢?对,有四个对吧,这里边有四个输入分区,四条这个数据流要汇到这里边来,然后它有几个输出分区呢?有三个对吧?它它有三个这个流要要发出去,呃,那大家想watermark在这个过程当中又是怎么去处理的呢?
07:14
哎,这里边就要跟大家提一句啊,就是每个任务其实在处理的时候呢,假如我们设置了时间特性是even time事件时间,那么每个任务它自己内部都要有一个事件时间的时钟,相当于他自己要有一个clock,但大家看啊,就是这里边它相当于是有一个clock的,对吧?有有有一个这个这个表在这里去计时的,那这个clock是谁来推进它往前变化呢?就是我们前面说的water mark。Watermark可以推进,它这个是clock改变,但是它有多个分区啊,那大家会想你你那个water不是在在每个里面都有吗?对吧,那这个它它这个watermark以谁的为准呢?诶这里边大家就来看一看,在整个的这个传递的过程当中,一个任务它是怎样接收water,然后更新他的自己的water mark,然后更新他推进它的这个时钟,然后呢,再向下一步发出去的。好,首先我们看第一个图啊,第一个图大家看这个一个任务,它会给每一个分区都去维护一个,大家看。
08:27
相当于这是分区的一个water mark对吧?啊partition water,所以就是一个分区,就对应着一个分区water mark,先给它放在这里啊,所以大家会看到这里边就相当于是什么呢?那就是你来了哪个water mark,我就把这个分区water mark是不是更新成启啊,啊就是这么简单的一个操作啊呃,当我这里边接收到一个watermark的时候,做什么操作呢?我们之前不是说那个water mark只涨不跌吗?所以我要把它跟我之前现在的那个water mark是不是要做个对比啊,假如说它涨了,那就那就更新,就把它写成,哎,把这个最新的这个water mark更新,更新成这个四对不对,大家看现在这个4比2大对不对,水位上涨了,所以我就把它更新成四啊,就是这样的一个操作。
09:23
那如果要是说大家会想到,如果要是说这个watermark比这个小呢,或者一样的。不做任何操作对吧,不改不能跌,所以是这样的啊,那当前大家就已经看到了,第一个分区的water mark是二当。四这个water rock来的时候。它经过一个判断之后,水位上涨,就把water mark分区的water mark更新成了四,哎,这是这是第一步操作,然后呢,然后大家看啊,它是不是就它维持了好几个分区的这个。
10:00
呃,这个water mark啊,对吧,现在它有四个分区,有四个water mark,那它它自己本身是以哪个water mark为准呢?取这四个里边的哪个呢,最小哦,大家会看到啊,它取的是最小的,为什么取最小的。争取想的是对之前的不较近。呃,大家因为因为大家这里可以想到,就是说所这这里边的watermark代表的是说他之前的数据都已经到了,对吧,言下之意就是比他大的那些数据是不是有可能还没来啊,还有可能还在路上呢,对吧,你还得等,所以大家想到这里边如果我以最大的这个来当成当前的auto mark的话,那是不是我自己整个这个任务就认为他之前的数据全来了,那你其他几个分区明明还有可能还在路上的呀,对吧?假如这里我以这个六作为奥mark,那是不是这个六我就认为它不会来了啊,对吧?所以这个大家就要注意啊,这里边我们是取它的最小的一个分区watermark作为我们全局的water,也就是我们这里边的这个时钟,对吧?大家看到这里边就相当于它的那个event event time时钟,大家比一下这个四更新了之后,现在最小的是不是三啊呃,第。
11:21
三个分区的这个三是最小的water rock,所以当前任务的时钟事件时间时钟就变成了三,它就表示三以前的数据我认为都来了啊,确实是大家看是不是每个分区三以前的都来了呀,啊,那就没毛病,整个都来了,然后接下来他干什么?他就会把三这个water mark向下游所有的分区广播出去。也就是说下游收到这个三的waterm的时候,是不是就也就知道了,哦,三以前的数是不是都来了,哎,这就符合我们这个处理的流程啊好,然后接下来大家再看一下这个第三幅图,第三幅图这又是一个什么状态呢?这就是后面这个第七,第二个分区的这个7WATERMA来了,对吧?七来了之后是不是更新了,比比之前这个四大,所以他把自己的这个分区water更新了对不对。
12:18
然后那整个的那个时钟要不要往前推移呢?诶看一下最小的还是三,那是不是这里我们的视线时钟就没有推进啊,哎,所以接下来这个下游的这个水位也没有涨,对不对啊,那那所以是这样的一个状态,然后接下来六第三个分区的六,这个water rockck来的时候,这里又改成了六,再算一下最小的是不是就变成四了,哎,所以大家看到这里边就会更新我们的事件,时钟是不是水位又上涨了,下游水位是不是也就上涨了。诶,这就是这个flink里边water mark的一个机制,它是不是就在我们这种分布式进行分区的这个架构里边,能够保证在某个时间之前,所有的数据都保证它处理完成啊,这是可以说是flink里边非常核心,也是非常有意思的一个一个概念啊,这是Spark里边没有的一个概念,对吧。
13:16
好,呃,这是在这个任务之间的这个传递,好接下来呃,这里多说一句,就是如果说我们有两个输入流的话,或者多个输入流,比方说我们前面要做那个UN,对吧,要做那个connect,那大家能想到就是。最后我们合在一起的那个,如果他们要是合在一起了,两条流合并在一起了,那这个沃拉按谁的算呢?你是不是还是最小的啊,然后我们一直说木桶理论对吧?所以就是我们自己要补自己的短板对吧?像我们这个流树理也是木木桶理论,那就相当于是要以最慢的那个为准,对不对,你一定要等最慢的那个来,所以这就是这样的一个过程。好接下来我们再看一下这个在代码里边water mark怎么引入呢?诶,这里边就跟前面的那个概念就结合在一起给大家说了啊,大家会想到water mark是不是跟我们说是跟数据里边的时间戳紧密结合在一起的呀,所以只要是even time,前面我们还留了一个没做完的事情,我们是设了那个什么时间特性了,对吧,Time characteristic,但是我们是不是没有指定数据源中的时间戳是哪个呀?哎,所以现在大家看这一步一起做了,这一步我们就是指定数据源里边的时间戳到底是哪个,Even time到底是哪个,然后再去指定。
14:42
你这个整个的even time延迟多长时间,做那个水位线对不对?哎,所以大家看这里边有一个比较常见的实线,就是就是这个实现啊,大家看稍微有点有点麻烦是吧?大家看是dance streamam掉了一个什么方法呢?掉了一个a sign time steps and water marks,是不是就是同时分配时间戳和水位线啊,对吧?呃,然后大家看这里边传了一个什么玩意儿呢?哎呀,传了一个什么邦ED的out of order time stamp。
15:16
这这是个什么玩意儿啊,就这光这个名字都一长串,但是我们大家可能这会儿还不知道它到底是什么意思,但是从名字上我们至少看到一个什么out of orderness是不是无序的乱序的呀,没有顺序的那个数据对不对?哎,所以它就是处理无序数据,我们要设置一个watermark这样的一个概念啊,那大家可以看到下面它这里边有一个什么方法,有一个提取时间戳的方法,对吧?呃,它它是怎么去提取时间戳呢?这里边就是element,这是不是就是我们的数据,这里边每一条数据对吧?取element里边的某个字段,大家还记得我们那个sensor reading里边有个time step吗?对吧?这里它就指定我就用sensor reading里边的time step作为我的时间戳啊,当然这里边它还乘了1000,呃,因为这里边的时间戳要的是一个毫秒对吧?大家想到啊,那之前如果是个秒的话,你就乘了1000啊,这是这样一个状态,好,那呃,大家可能会看到,就是这里边的这个这个东西有点太烦了,是吧,根本不知道这是怎么回事儿,那实际上本身这里边要传的是个什么东西呢?大家可以看一眼啊,这里边大家可以看到,就是说flink其实是暴露了一个time stamp a sign这样的接口供我们实现的。
16:38
所以就是说前边这个a time sta and watermarks这个方法里边,我们其实要传入的就是一个time stamp a signer,对吧,其实就是这样一个,所以我们可以自定义对不对,所以刚才的那个那个一长串那个东西,那是不是也是相当于系统已经给我们实现好的一个,就是定义好的处理乱序时间的一个ign啊,啊,对吧?所以那个是经常处理乱序时间的啊,那大家可能会想到,如果要是说我处理就不是乱序时间,我是排好序的时间呢。
17:10
还有一个更简单的处理方式,大家看这里边可以啊,Ign as sending time steps,因为是已经排好序的数据,那大家会想是不是就不需要延迟触发了,那是不是water mark就就不需要去专门设了呀?那所以大家看这里边就只要去设Sam就可以了,你只要指定哪个字段是time Sam就OK。啊,这是另外一个就是升序数据的一个指定啊,那更常见的乱序数据,其实就是用了这里边的这个方法,大家看就是我调用了邦Y的outum orderness time STEM啊这里面大家看到就是它里边本身这个类啊,它还有一个参数,这个参数是什么呢?这里面设了1000毫秒。啊,大家如果要是这会儿不知道的话,我们就先提一句,后面代码里边实现的时候再给大家详细说啊,这个其实就是我们所说的要等待延迟的那个时间啊,就是那个你要等多久对吧,延迟多久发车就是这个东西,然后下边的这里边就是提取时间戳对不对?哎,所以大家看延迟发车的这个时间,其实是不是就是摩RA产生的机制啊,对吧?啊,你就是来了这个时间戳之后,我会认为当前的那个水位是不是涨到。
18:26
他延迟一秒的那个位置了,对吧,所以言下之意是什么,我当前的那个五秒的数据来了一个数据里边时间说是五秒,如果我现在这里面设置延迟一秒的话,水位应该是几啊。Mark应该是几啊,现在嗯。啊,反正是一的关系,那就是四或者六对吧,那大家想一下是四还是六。四,哎,对了,应该是四,对这个仔细思考之后,大声的答出了正确答案,非趁啊,呃,就是大家其实会想到,为什么说它会延迟呢?是不是就是时间进展的会慢一点啊,就是你这里边五的数据已经来了,但是我不认为五之前的所有的数据都都已经到了。
19:17
而我延迟一秒,意思就是我只认为五个数据来了,我只认为四以前的数据都来了,哎,大家看这是不是就是相当于延迟一秒发车啊,啊,所以是这样的一个状态啊好,接下来我们再给大家详细的说一下这个time stamp signer啊,这个其实大家看就是我们这里边不是可以自定义这个maile s吗?在这里边我们就可以自定义,你到底是怎么去抽取时间戳,然后怎么去生成water mark,这个都可以自定义啊,大家看这个是不是特别的呃,灵活对吧?呃,非常的灵活啊,那那这里边呃,这里边我们要要实现一个什么东西呢?我们这个maile s要要实现什么呢?它可以有两种类型,可以实现两种接口,那么这两种接口都继承自我们前面讲的这个time step s,这是统一的,它的底层的那个负负接口啊负类,好,那接下来大家看一下到底是哪两种类型呢?
20:16
一种叫做aigner with periodingor water marks。字面上理解就是什么?这是一个aigner,它的对周期性生成water mark的一个aigner对吧?啊,所以它的它的这个实现的原理就是系统,你只要设置了这样一个这个塞,系统就会周期性的把water mark插入到这个流里面去,那大家想这个周期性的是even time还是processing time。对,这就得是processing time了,对吧,你这个周期性的,你你那边even time都不知道是什么时候来啊,那当然就得是processing time了,这里大家注意,默认的一个周期是200毫秒。
21:03
呃,然后呢,我们可以用另外的一个东西啊,就是用这个,呃,执行环境里边的config.set auto water mark t来设定到底是这个周期是多少多长时间去生成一个water mark啊,可以用这个去设啊,全局的。然后接下来再给大家说一句,就是前面的我们讲到的那个什么处理乱续时间的那个什么邦ED的out of alness time sta extractor那个类,对吧?呃,还有我们前面那个升序的那个事,事件的那个处理,他们都是基于什么去生成water rap的呢?都是周期性的,对,如果我们不设的话,它都是200毫秒对吧?啊,都是这样好,然后那有一个周期性的,那是不是就还有没有周期性的呀?啊另外一类就叫做a sign with punctuated的watermarks,这个puctated的是那种,就是打点打断对吧,就是这种方式,所以大家会看。
22:03
看到它是就没有这个时间的周期规律了,生成的规律了,它是可以是通过打断式的这个方法去生成water。啊,那具体的一些例子,我们等一下到那个文档里边给大家看看代码吧,看看这个到底怎么实现,呃,然后这个watermark的设定,大家会想到你这个在实际的这个处理过程当中,到底怎么去设这个water mark呢?呃,其实这个东西也还它需要什么呢。就是你你在处理的过程当中,这个怎么样去生成water rock延迟多长时间,你说你到底去怎么设定呢,你到底是这个延迟是给大一点好还是给小一点好呢?这都不好,对不对?所以一般情况这个东西你是设置它是要需要对这个我们的这个这个业务背景有有了解才行,对吧,你得知道他的那个数据乱序,乱序的程度到底有多大,对吧?啊,这个情况是需要了解的,或者就是说你不了解的话也可以怎么样呢?也可以我先看看他来的那个数据到底乱序乱成什么样,对不对啊,这就可以有一些比方说启发式算法啊,甚至我可以用一些机器学习算法去学习它的这个规律,对不对,它的这个最大的乱序时间大概是一个什么样的概率,可以把它提取出来啊,所以这就涉及到其他的一些东西了啊,那当然了,大家会想到如果water mark你设的延迟太久的话,那会导致一个什么结果啊。
23:30
对,那是不是这个我们的这个实时性就不够好了,延迟就太大了,对不对,算的就太慢了,那这个有一个解决办法,就是你可以在水位线到达之前输出一个近似结果,然后呢,到水位线延迟到的时候,再输出一个准确的结果,这就有点像,哎,我这个可以牺牲一点准确性,先输出一个那个快速的一个结果,对不对?哎可以用这种方式,那那另外还有一种情况就是,如果我们不想,哎你这个太慢了不行,延迟不能太久,那我就给他设的设的越少越好,对吧,我设一秒,甚至我不设了零秒,对吧,直接就按这个来,那大家就知道那就会怎么样,对,就会丢数据了,对吧,就会得到错误的一个结果,有些窗口关闭的时候,这个数据就没收进去啊,那大家会想到也有补救机制,什么样的补救机制呢?
24:22
大家还记得那个兜底的那个东西吗?是不是在window操作里边可以allow那个lateness啊,对吧?可以允许处理延时的数据对不对,那我们可以把它放到一个测试出流里边,然后来单独再处理这个这一部分数据啊,所以大家看flink就是把这个乱序数据,把我们这个流流处理的这个领域啊,处理的这个正确性做到了极致,对不对?给你各种各样的手段去给你兜底对吧?呃,你首先来了之后,你先有时间窗口去去看时间窗口,这个不行,你你这个可以延迟触发设置它这个延迟发车时间对不对?所以就有了water mark,有了water mark这个不同的这个水位的推进,那如果觉得water mark还有可能出现这个搞不定的情况,没关系,你再去测试出理去,去allow这个延迟数据对不对啊,还有情况能给你把它兜住,这就是我们现在了解到的FNK里边关于时间语义和这个water mark,对我们整个流处。
25:22
程序的一个保证啊。
我来说两句