00:00
前面我们已经把water mark的原理和传输的规则都已经讲过了一遍,那接下来呢,我们就在代码里边真正看一下water mark到底应该怎么引入,之前其实我们在代码里面也看到了,是不是可以调一个方法叫做。Assign type sta盘的water marks啊,啊,所以这个我们就直接基于它是data stream的一个方法啊,所以在这里我就可以直接基于一个date stream直接调用这个方法,然后当前这个ign time Sam and automarks里面要传的是个什么东西呢?大家看到这里边要传的啊。是一个又是一个aigner,但这个igner跟我们之前那个窗口的分配器不是一回事啊,这个a ser是用来分配分配时间出口watermark的,对吧?啊,所以它本质是个什么东西呢?大家看这个接口extends,它本身是一个time stamp a signer,也就是说时间戳分配器,呃,或者说这个大家想它叫分配器啊,其实我们想它应该是个提取器,对不对?
01:04
提取当前那个数据里边的那个时间字段嘛,提取出来这个时间字段,然后给这个数据打上一个时间戳,然后呢,基于这个时间戳,我们是不是还要生成watermark呀?啊所以在这一步操作里边要做两个事情啊,两件事情一个是提取时间戳,另外一个是生成water mark好,那所以接下来我们首先看一下。这里边到底应该怎么样去实现这样一个接口,那大家可能会想到,既然有接口吗?那是不是我直接去new一个类,然后实现这个接口就可以了,哦,那呃,自然我们就想到,那难道又要去自己实现吗?不需要flink里边底层给我们已经实现了这样的一个类啊,我直接可以用啊,这个类叫做。Bounded out of orderness大家看啊,然后time stamp,哎,这个类特别的长啊。非常长,对吧?Bounded out of order Miss time stamp extractor,好长啊,呃,这个从字面上理解的话,这是什么意思呢?
02:10
有借的帮你的,有借的对吧,然后out of order Miss out是大家知道,呃,有订单,但是也有那个顺序,有排序的意思,所以alness其实是顺序,Out of orderness,那就是乱序,所以它是有界乱序情况下的一个时间戳提取器,对吧。啊,那所以接下来看一下里边要去重写的一个方法,实现了一个方法extract time step是不是提取时间啊,大家看它的返回类型是一个长整型,是不是就是一个时间戳输入的参数呢?大家看sensor reading类型的1ELEMENT是不是就是,那大家想这不就是每一个数据来了之后都是一个cesor reading嘛,我们已经包成这个类型了嘛,接下来呢,就是每一个数据来了之后,调一下这个extract time这个方法提取它的时间戳,对吧?啊,把这个时间戳追加到这个上面去,所以接下来我们怎么提。
03:09
是不是直接,哎,对element,我直接去get当前的time stamp,注意这里边的这个方法,它需要就是我们可以看一下这里啊。呃,就是下边这里边extract time step对吧?呃,大家如果要完整的看的话,可能得看到它底层的这个,对于这个接口的要求,大家看这个time step s,这里边就有一个方法叫extract time STEM对吧?这里边大家要注意它需要去生成的这个时间戳必须得是一个。大家看毫秒数对不对。对吧?啊,必须得是,就是说从那个1970年呃,1月1号啊,零点开始的一个毫秒数,那初始的时候,如果要是说呃,任何的一个一个这个数据啊,都没有带着一个时间戳的话,那这里边默认的那个值应该是什么?是不是长整型的最小值啊,啊这个大家都一看就能看到这个代码源码里边的定义啊,那所以在这里边我们其实得到的应该是一个毫秒数返回,那我们当时数据里边的那个数据,大家还记得是什么吗?
04:20
这里面是秒还是毫秒啊啊,对这个数其实应该是一个秒啊,那所以在这里对乘以1000,这就是我们想要得到的这个结果对吧?啊,那当然大家会看到这里面还在报错,因为当前的这个类,它的构造方法里边必须要传一个参数。这个参数又是啥呢?看一眼它是一个时间,诶,大家自然就想到了,那既然它是要处理乱序嘛,对吧,有借乱序的这个提取器嘛,乱序数据是不是除了提取时间戳之外,你不是还要生成watermark吗?啊,对吧,你不是还要这个分配watermark吗?那是不是必须要设置一个watermark的那个延迟时间啊,哎,所以这个延迟时间在哪里给就在这里面设定,这里边大家看他所说的这个延迟时间是一个max out of order needs指的就是。
05:15
最大的乱序程度,大家想为什么他给的是一个最大乱序程度呢?回忆一下我们之前这个例子里边,哎,大家想我是不是就是要选取出他们这个乱序,什么叫乱序程度,是不是就是诶五跟二之间,本来应该二在前面,现在五在前面了,这是不是乱序啊?那他们的乱序程度是不是就是五减二就是三啊对吧?那同样五在前面,三在后边,那是不是它的乱序程度就是二啊,我取最大的乱序程度,是不是理论上三就可以hold住这里边所有的数据啊,对吧?等这个三,那就相当于所有数据都可以正常处理了啊,所以大家看这里边它要给的一个最大乱序程度,其实就是保证最后我们处理结果正确的一个一个watermark的延迟时间,对吧?啊,所以这里边我可以随便给啊。
06:09
啊,我这个就举例了啊,一般真实的场景下边大家要注意这个用的是那个还是跟window一样啊,Window in time点。然后真实场景下,一般都是这个几十几百毫秒级别的这个这个场景,对吧,我们这里边的话,为了简单起见啊,因为我的数据都是秒嘛,所以说我这里边可以给一个比方说给一个两秒的延迟啊,这是可以看到的,这样的话就定义完了,好所以整体来讲的话,处理乱序数据非常简单,就是在前边我需要把这个事件时间语义设定出来,后边是不是还需要从当前的数据里边去提取时间戳,然后按照一个延迟的规则去设定water mark呀,啊,这就是这样的一个原理啊,呃,那当然了,在代码里边除了这样一个处理乱序数据,Bounded of order orderness timete extra,除了这种方式之外,还有另外一种分配时间出的方式。
07:06
啊,那当然它还是time and whatmark了,对吧?那这里边要拗的是另外一个。另外一个类啊,我直接用这个,大家看这个类叫做sending time extra。它的这个含义ascending是升序的上升的对吧?时间戳的一个提取器,这指的含义就是说大家想想是不是有些场景我的那个数据就真的是理想情况按照顺序排好的呀,那这种情况下,你说我还要去做这个奥mark延迟吗?好,好像不用去再再去指定了,对吧?是不是当前他的那个time Sam只要提取出来,我就把它当成当前的事件时间是不是就可以了,那所以这里边就呃不会涉及到呃另外的这个延迟的这种出发了,那所以是不是我直接用一个升序数据就可以了,然后大家看这里边同样有一个extract sending time stamp这样一个方法,我这里返回什么?同样还是get time stamp乘以1000,把当前这个数据返回就可以了,对吧?啊,那另外看前面它的构造方法是不是没有参数,不需要去设置对应的,呃,相关的那个延迟时间对吧?啊直接给定就可以。
08:20
了啊,所以这就是看具体的应用了啊,如果是就是我们说的,如果是正常排序的数据的话,我们说是升序数据。呃,设置事件时间和watermark嗯,啊,那下边这个是乱序数据对吧,乱序数据设置啊时间戳和water。这就是我们在代码里边做的一些实现的方法啊,那么如果要是说具体到底层源码的实现里边,大家也可以看到啊,这里边本身要的这个。
09:10
这里面传的这个参数啊。它底层其实要的是一个timete a signer对吧?啊,那当然了,这一个interface呢,又分成了两类,大家看到这里边有两类,一类叫做a sign with periodic water marks。字面含义就是说,就是一个周期性生成watermark的这样的一个分配器,对吧?哎,所以大家注意啊,这里边我们的这个watermark其实是怎么样生成的呢?它并不是说之前大家理解可能是一个数据来了之后,后面就应该带着一个watermark,我们之前在给大家做示例做演示的时候,也是一个数据后面就应该有一个watermark对吧?但是大家看这种方式其实不是它是怎么样。周期性生成,也就是隔一段时间我去判断一下,诶,当前应该生成一个什么样的watermark,我插一个watermark进去,对吧,然后再隔一段时间固定周期,然后我再生成一个新东方的插进去。
10:09
啊,这个watermark就是一个特殊的数据嘛,直接插入数据流就可以了,这是这种方式啊,呃,那这里面调用的时候,大家看,除了这个传一个aign with periodic watermarkx之外,另外还有一种方式,大家可能会想到,那你说我如果不想这个周期性生成,我就想每一个数据来了之后,后面就跟生成,呃,马上就生成一个watermark,可不可以呢?也可以啊,那就叫调第二种这个接口传参传的是一个a signer with punctuated water rocks。就传一个这个东西就可以了,当然了它的呃底层就是继承的也是这个time STEM signer对吧?啊,那这个接口呢,Time STEM sign接口,大家看到它里边只有一个方法,就是必须指定怎么样去提取时间,戳前面我们是不是都实现了这个方法呀,对吧?啊那么对于这个a sign with puctuated water marks来讲,大家大家看一下这个字面它是什么意思。
11:11
Puctuated PU puctu是有那个,呃,打断击打这这样一个意思,对吧,所以它的这个就是类似于一个断点式的这样一个生成watermark的方式,那它是基于什么去做这个断点出发呢?就是基于数据,来了数据之后,我就去判断一下要不要生成一个watermark,所以大家看它下边的这个方法叫做check and get next water。然后里面的参数大家看一眼,有一个last element,这个T是不是就是当前我的数据类型啊,对吧,就是你当前啊,比方说我sensor reading,那说这就是刚刚上一次接收到的那个数据啊,Last element嘛,对吧,刚刚接收到的那个数据,然后接下来还有一个extract extracted time stamp,这是不是就是通过我们前面底层的那个提取时间戳那个方法提取出来的那个时间戳二。
12:06
当前数据提取,提取出来时间戳对吧?啊,所以这里边其实就是你可以根据这个方法啊,去利用上一个数据以及当前的时间戳得到一个,大家看它返回的类型是什么。就是一个water mark对不对,然后water mark,这不就是我们之前看到的这个final class这个类吗?对吧,它里边最重要的就是一个time stamp嘛,你生成这个东西就可以了啊,这就是我们呃,对于这个断点式生成和周期性生成的这种方法方法啊,那大家看一下周期性生成里边的这个方法,呃,就是更简单一点,它就直接是get current water mark对吧。啊,那这里边是不是我就不基于这个当前的数据啊,因为它周期性生成嘛,隔一段时间自动生成就完事了,直接你去,呃,这个触发一个这个watermark啊,返回一个water就完事了。啊,那那有同学可能会就是对这个还是有疑惑,就是说那具体生成的规则,我们这里边。
13:04
这里边不是掉了一个邦地的out ofne time some吗?那么它的底层到底是啥呢?他是。前面大家也看到了,它是一个a sign with periodic watermarks,它是一个周期性生成watermark的方式,对吧?呃,那所以大家就会想到,那它生成water的时候,是不是要实现一个get current watermark这个方法呀,那大家可能想到,那它这个周期性生成连数据都没有对吧?不基于数据,那它从哪冒出来的一个一个watermark呢?它基于什么生成这个watermark呢?大家看一下这个原理啊,我们解析一下这个代码。它的这个类里边有一个非常重要的属性叫做current max time stamps time stamp。是当前最大的时间戳,所以大家就会发现了,现在为了要处理乱序数据,是不是在这个当前数据的时间戳和我们要处理的这个事件时间啊,也就是watermark之间是不是应该有一个桥梁,这个桥梁就是。
14:12
是不是当前最大的那个时间戳啊,因为我们要求事件时间是不是不能推,不能回滚,不能倒流,那是不是就要求你不能直接提取出时间出来,就当成这个事件时间了,那是不是我就必须只保存它的最大的那个呀?然后另外我基于这个最大的时间戳是不是还要有一个延迟啊,诶,那所以automark是不是就基于最大时间戳减掉一个延迟时间就完事了呀,所以之前我们其实我们自己做判断的时候用的就是这个规则对吧?啊,只不过现在给大家做了一个代码上的一个实现,大家看一下这个源码怎么怎么做的啊啊然后这里边它还有一个属性叫做last imit的过程上就是上一次发出的那个。这个water对吧,呃,默认的值是长整形最小值,大家看这个watermark,它表示这watermark并没有用那个,你有一个watermark那个类呃的对象对吧,它是直接用一个什么来表示watermark的,就用一个成整性,因为大家知道watermark里面最关键的就就是啥。
15:11
就那个时间戳嘛,它就表示时间的嘛,所以这里边就用一个长整性来表示上一次的那个watermark,那这里边还有另外一个非常重要的属性就是。Max out of alness,这是不是就是我们的最大乱序程度,也就是watermark的延迟时间,对吧?啊,那所以这里边大家就看到了,呃,一开始我们构造方法里边是不是要把这个附进来啊,对吧?然后这里边是还有一个current mapx time stamp,就是用这个长整形的最小值加上了这个,呃,延迟的时间,为什么要加这个呢?这个大家可能会想到,它其实要加这个的原因是在于后边我们如果要生成watermark的时候,周期性生成,那是不是就有可能一开始还没有数据来的时候就在生成water呀。
16:00
那你这个时候生成的时候,是不是一开始本来当前的那个最大时间戳我这里边默认的啊,这个watermark是不是应该是一个长整形最小值啊,那长整形最小值,你假如说我生成奥特曼的时候,是不是还得减那个最大的,呃,延迟时间本来就是最小值,长整形最小值再一减,那就减成什么了。那是不是数据溢出了呀,数据一溢出是不是就变成了一个很大很大的数了,那是不是代表我的事件时间是不是已经推进到遥远的未来了啊,那所以这就不正确了,对吧?所以它为了防止这个数据溢出的这个bug啊,所以这里边一开始初始化的时候先加了这个,呃,延迟时间啊,这个比较简单啊啊那后面我们最关键的就是看啊,你看有一个一个呃,提取这个时间戳啊,Extra time step,这是我们自己要去实现的方法,所以大家大家看它是抽象的对吧?啊,这是抽象的啊然后我们最关心的其实就是怎么样去get current water mark,对吧,你怎么样去提取这个当前的这个呃时间当前的这个water mark啊大家看做了一个事情,就是首先当前潜在的这个water mark。
17:10
是不是就是用最大时间戳减掉延迟的这个时间啊,直接减对吧,然后判断一下,因为我们automark不是只能增不能减吗?所以判断一下是否大于等于上一次发出去的那个watermark呢?呃,如果大于等于的话。哎,那么我们就把上一次的发出去的water mark也更新一下,对吧,也把这个做一个更新,然后接下来我就直接new一个watermark return,大家看是不是就这么简单啊。这就是周期硬生生water的一个原理。所以本质上来讲还是非常简单的,那至于说我们讲到的这个ending time stamp啊,给大家看一下这个升序的这个ending time sample。Instructor它的底层大家看到它是不是也是周期性生成啊,也是对吧,那它的那个生成watermark的这个方法又是什么样的呢?
18:07
大家看他也是直接扭water对吧?它是判断当前时间戳是否为长整形的最小值,如果是的话,哎,那就生成water ru就直接用这个,如果不是的话,它是用了一个。当前的time step减一,所以大家可以可以简单的理解成这是这是相当于什么呀?相当于延迟了一毫秒对不对?当前的watermark这个时间戳都是以毫秒做单位的嘛,相当于延迟了一毫秒,那为什么要有一个减一呢?啊,这简单来讲就是说你像我们要做这个对应的,大家想啊,我要做这个对应的窗口触发的时候,那是按照什么来触发当前的这个窗口操作呢。呃,那其实就是对大家想我来了一个这个五秒钟的数据,那其实当前的water mark它是不是应该是进展到了就是我五秒减一毫秒啊对吧,四秒,呃呃,零这个999毫秒这样的一个状态啊,然后我当前零到五秒的这个窗口是不是也刚好不包含五秒这个数据啊,它的最大的那个数据是不是也刚好是到了五秒减一毫秒的那个状态啊啊,所以这样的话就刚好匹配上可以把它输出对吧?啊,所以这里边升序数据的处理是这样一个过程啊。
19:27
这就是关于这个使用watermark啊,在代码里边去做设置的一个过程,大家如果感兴趣的话,也可以看一看文档里边,我们在这里边其实是有这个,呃,不同的watermark的这种实现方式,大家看,呃,就是比方说如果说我要去自定义一个周期性的这个,呃,这个实验说提取的这个signer的话,其实也是可以的,我可以就仿照大家看这个完全我自己写的,对吧,自己也实现啊,实现这个a s with periodical automarks,仿照bonded的那个乱序数据的那个接口,大家看我要两个最重要的属性,是不是就是一个是bond,就是那个延迟的时间,另外一个是不是就是当前最大时间戳二啊。
20:11
简单实现,那那是不是我就get current water,就直接你有一个water直接剪掉就可以了。然后另外大家要注意,那你这个当前最大的那个时间戳是不是还得更新啊,所以这个更新的时候在哪里呢?就在提取时间戳的时候,我这里边是不是每来一个就要判断一次当前的那个是否最大呀。更新一下,这相当于我保存了一个状态对不对?把当前最大的时间戳保存成一个状态,每来一个数据,我可以提取它对应的那个时间戳,另外呢,还把当前的那个最大时间戳做一个更新啊,然后周期性的,比方说隔一段时间我就直接生成对应的啊,减掉的那个延迟时间,生成一个mark就可以了。啊,这就是一个简单的自定义实现,然后另外还有这个呃,断点式的实现,大家可以看到断点式实现我怎么做,哎,我这里边就是按照这个数据啊,来一个之后去生成一个watermark,甚至我还可以要求什么呢?我要求当前必须是341这个数据来了之后,我才去生成对应的那个watermark,对吧。
21:17
这样的话,我生成的watermark是不是就有限有有限制啊,对吧,就特定的场景下才有对应的这个输出啊,我这里还可以设置一个延迟对吧?呃,这里棒的啊,设置一分钟延迟,你有一个这个,如果要是说不是341的话,我直接return now,直接就没有任何的mark生成。啊,这就是一个非常简单的实现,大家也可以在这个呃代码里边去做一个尝试。
我来说两句