00:00
那我们已经知道这个water mark的具体原理了,那我们啊,得在代码里边来做一个引入,做一个实现,对吧?那前面我们说了这个water看起来好像是来一个数据,就直接在后面插入了一个water mark对吧?诶,那这个东西你到底怎么样去定义代码里面怎么样能让它真的插进去呢?诶这里边就要给大家讲到前面我们还有一步没给大家讲,就是从数据里边要提取时间戳嘛,对吧?啊那另外大家会发现这里边其实如果我们设置了这个事件时间语义的话,还没有设置时间戳啊,然后呢,我们那个生成watermark的时候,是不是又要基于当前的那个时间戳去做一个生成啊,当前最大时间戳嘛,对吧?所以这里边大家会看到啊,这里边有一个方法叫做aign,基于data stream,有一个方法叫做a sign time stamps and water marks啊为什么放在一起呢?因为这个方法就是就是在一起的,对吧?啊这里边就是用这个方法从当前数据里边提取。
01:00
时间戳,并且指定watermark的生成方式啊,所以说这个方法一定我们就可以指定这个watermark的含义了啊,那这里面其实有两种不同的这种生成watermark的方式啊,一种是调这个,另外一种这个呢是比较普遍的,另外一种呢是下面这个叫做assign asending time stepmps,什么叫asending呢?Asending大家知道是升序的,对吧?那所以asending time steps意思是什么呢?就是表示已经排好序的这样的一个升序数据,它的时间戳就是按照按照顺序来的,那大家想这个是不是就相当于我就不用定义auto了呀,对吧?我就直接按照当前数据它的时间戳去定义我的这个事件时间不就完了吗?这就是我们想的那种理想完美的情况嘛,啊,这就没有乱序数据啊,所以在这种情况下,我如果用了事件时间的话,你就用这个ending time steps。连what rock的指定都没有,为什么呢?直接指定提取时间戳就完了嘛,啊那这里边大家看到里边传的是一个什么呢?一个函数,一个提取器对吧?一个structure,这个函数是干什么的?就是要得到一个长整形的,那不就是时间戳吗?对吧?从当前数据结构里边提取出一个字段来转化成,注意要转化成一个毫秒数对吧?啊,就是这里边我们要转化成一个毫秒数,长整形的毫秒数返回,哎,这就是表示当前数据上面相当于就就像我们那个PPT里边给大家演示的啊,就相当于这里就加了一个时间戳放在上面了。好啊,那接下来这里面的实现非常简单,我可以写一个拉姆表达式对吧?这里面简写的话,下划线我用哪个呢?Time嘛,对吧?呃,这里边大家要注意,我们要的是一个毫秒数,如果这里边你给的是这个数,大家知道啊,这个数这个如果是十位的话,按照我们现在的时间来讲的话,幺五几打头。
02:55
如果是十位,一般这就是秒数对吧?啊,毫秒数的话,后面可能得加三位,所以这里边如果本来你这个时间是一个秒的话,这里边要一个毫秒提取时间戳,那怎么办呢?哎,那这里边我就去乘以1000不就完了嘛,对吧?啊,就得到这个升序数据的一个时间戳啊,那大家可能就想到那这个是声序数据对吧?声序数据提取时间戳我们我们不定义这个water mark,是因为water升序数据里边water mark直接用这个就完了,对吧?我们判断时间就用它这个时间戳就完了,所以说就没有意义了,这里边就不不用去专门定义watermark了,那假如说是乱序,我们不是还要引入那个最大时间戳的一个一个这个等待时间嘛,对吧,延迟时间嘛,那怎么办呢?哎,那就调用另外一个方法了,更一般化的这个方法,Ign time stamps and watermarks,好,那这里边大家看到里边要传一个什么东西呢?
03:55
里边要传的大家注意啊,是一个assigner assigner,这里边是一个哈,具体来讲的话,大家看到啊,这个assign time time sample and watermarks啊,它有两种实现传的参数,有两种情况啊,这个是重载过的啊,一种是实现一个as with periodic watermarks,也就是说它实现的这个AS是什么呢?生成watermark的,其实是是什么呢?Periodic周期性的生成watermark。
04:26
也就是说不用你自己去管对吧,就隔一段时间我自动生成一个watermark,那怎么生成呢?当然就是按照我们的那个定义,就是哎,你判断一下当前最大时间戳是什么对吧?然后你减去一个延迟,用当前的最大的那个减去延迟就可以了,这个就跟数据没关,我就是隔一段时间去判断一下,隔一段时间判断一下就可以了,这是周期性生成。另外还有一个是a signer with punctuated watermarks,那这个又是干什么呢?这这个也很简单,大家想这个puctued是断点式的,间断式的对吧?它的含义就是你不要周期性的,就是隔一段时间去生成一个,而是怎么样呢?你按照数据对吧,来了一个数据之后,我就去判断一下要不要生成一个automark,所以这两种情况都可以去利用啊,那呃,这两种情况用的更多的是哪个呢?啊,大家有同学可能想到,那应该是来一个数据就就生成一个呀啊,其实不是啊啊大家可以想象,你如果说来一个数据就生成一个的话。
05:26
那它的效果是什么呢?效果就是每一个数据后边都会跟着一个watermark,对吧?啊,那如果说要是隔一段时间生成一个的话,效果是什么呢?哎,那可能效果就是说好几个数据后边有一个water mark对吧?固定时间间隔生成一个watermark,那有同学说,哎,你这种情况间间隔性的这种生成这个不好啊,对吧,你来一个数据,这个它有可能已经更新了呀,这种情况更合理一点,但是大家要想,你要想大数据场景下到底应该是什么样子的大数据场景这个应该是什么情形呢?
06:04
大数据场景下,其实是大量的数据快速的啊,短时间内大量的数据就全涌进来了,然后他们的时间戳,甚至有可能都一样,对不对,所以你这里每来一个数据就生成一次,每来一个生成一次是不是都差不多啊,对吧,生成的这个并没有推移。而这里边如果要是我隔一段生成一个的话,是不是就相当于节省了很多操作啊,性能上就会稍微好一点,对吧?啊,那那这种间歇性的这种生成机制有没有什么问题呢?哎,那可能就是说数据如果要是比较零散的时候啊,数据稀稀落落的时候,那这种程度,你如果要是来一个后面更新一次,来一个更新一次,这个就比较高效对吧?你要是说间断性的诶,隔一段时间更新,更新一次,隔一段时间更新,更新一次数据都没来,那你更新就没有什么用了,这个就比较没效率了,所以它俩适用的场景是不一样的,周期性的生成,它适用于数据比较密集,大量数据来的时候比较常见,那间断性生成呢,那就是数据比较稀疏的时候比较常见,那你说我们处理大数据到底是要处理这个,处理这个稀疏的场景,还是处理稠密的场景呢?当然就是稠密的场景,对吧,你稀疏场景说实话你浪费一点性能无所谓嘛,数据都没有,我都。
07:24
闲着呢啊,闲着也是闲着,对吧,你随便来两个owa没关系,不受影响,但是你这里边数据稠密,数据非常,数据量非常大的时候,这个可能就性能影响比较大,所以一般情况我们用的是periodic watermax哦,那大家可能就犯愁了,哎呀,完蛋了,那难道我要自己你看到这是个什么,这是个interface对吧,Draw interface啊,它底层实现的啊,就是不管这两种情况,他们底层都是一个time step a sign,都是这样一个东西,对吧?你这里边看这个PU eighty的water mark,这个也是一个time step a s,这是一个抽象的,呃,这是一个这个上层的一个接口对吧,抽象的一个接口啊叭较抽象的一个接口,然后这里边我们那那这里面难道我要自己去实现一个类去去做它吗?啊其实不用啊,没那么没那么复杂对吧,那要那样的话确实就有点太麻烦了,我们这里边有一个已经弗link帮我们底层实现好的周期性生成watermark的一个。
08:24
处理乱序数据的这样一个类啊,这个我们又出来,这个叫什么呢?这个类有点长啊,Bounded out of,哎,出来了对吧,Out of order ni time sta chapter,哎呀,这个类名好长啊,好累对吧,听着啊,但是大家一看,其实是知道它什么含义的,什么意思呢?帮的有界的对吧,Out of orderness,乱序,然后什么意思呢?有界的乱序,也就是乱序程度可以确定的这样的一个time stamp的提取器,所以它处理的是乱序数据,而且是什么呢?乱序程度可以确定的那种提取器,对吧?啊,所以接下来大家看,呃,我们直接就弹出来了,它自己让我们重写这个方法,什么方法呢?Extract time step,这不,不就是提取那个时间戳嘛,对吧,你要想让它延迟,你首先得告诉我时间戳是什么呀,提取出来时间戳,我才能在最大时间戳的基础上去指定一个延迟时间嘛,哎,所以这里面还是一样,我指定什么呢。
09:24
Element,这是当前的这个数据,它的time cent对吧?呃,比方说本来是秒,我再乘以1000,这就是当前的好秒数,就把它提取出来了,好,这就是当前的这个所谓的时间戳啊啊,那时间戳我还还得有一个这这个大家可以在里边看一下啊,看一下这里边它具体的时现,具体时间,你看这里边它是不是就定义了一个current max time stamp呀,定义了一个这个最大的时间戳啊,所以这就相当于它自己要保保存这样这样一个状态了啊,然后下面你看啊,这里边我们定义这一个过程的时候,其实它会把当前,你看这里边要做一个调整啊,把这个都赋值,属性都赋值,这个并不重要,我们关键是看什么呢?看get current water mark,你看这就是生成water mark的这个机制,他要干什么呢?他要用当前的current最大的这个时间戳减掉一个max out of orderness最大乱序程度。
10:24
对不对啊,减掉最大乱序程度,然后得到的这个,这就new一个这个这不就是按照这个来生成的嘛,对吧,它是保证要比之前的大对吧,比之前生成的那个要大,保证它升序嘛,啊所以最后这个water,你看它里边就一个参数,是一个time step,里边就一个属性,就这么个东西啊,所以这就是watermark底层就是这样的一个类啊,一个一个数据结构啊,所以最后我们其实就是做了这个事情,那这个max out of orderness又是什么东西呢?这是它自己定义出来的一个属性,它会怎么样呢?哈,就是调用这个Bo out of alne的时候,大家看到这还缺一个参数呢,这个参数就叫做max out的auto aboutness,然后在这儿转成毫秒数付给他啊,所以我们定义的那个最大延迟就在这儿了,最大延迟是什么呢?你为了保证正确性,就应该定义它的最大乱序程度,对吧?啊,所以这里边比方说我们给一个。
11:24
Second字,比方说我乱序程度是三秒,对吧?给一个time second3,这样的话,接下来我所有的时间就相当于是当前最大的那个时间戳,要减三秒才是当前的时间,这就是这个watermark的一个引入的过程。好,那当然了,大家会想到那这个最大的乱序程度,这个到底你怎么去定义呢?对吧?我到底怎么样去知道呢?这就涉及到一个关于water mark的一个设定的过程了。啊,大家看到这个我们可以引入对吧?Time STEM aigner,然后最后这里边我们要涉及到一个它的设定,那怎么样去设定呢?这个看起来就是我们直接给的嘛,所以一般情况需要你对这个相关的业务场景啊,业务领域乱序的程度要有一个了解,那这里边就是太大太小都不好,对吧,你如果设太太小的话,那就是相当于最后结果不正确嘛,它实时性会好,但是但是它不会会不正确,那如果设的太大的话,那收到的速度就很慢了,对不对?哎,那那这个就相当于这个我们最后得到的结果就实时性就降低了啊,那有同学就会说,那你这个最后还是两边不靠啊,你说是可以自己去平衡,但是最后两边不靠啊,那一般实际应用成绩怎么解决这个问题呢?一般是这样去做,就是怎么样呢,我先快速先设置一个比较小的water mark延迟,比方说这里边我给一个毫秒对吧,比方说给一个mini啊second,比方说给一个30。
12:56
十毫秒的延迟它可以怎么样呢?它可以hold住我们的那个大量的大部分数据延迟的情形什么意思?因为大部分延迟是不是相当于也类似于一个正态分布啊,对吧?大部分的乱序应该都在一个范围内,然后有少部分那个漏网之鱼可能延迟会很高啊,那所以这里边呢,我不能无限等下去啊,我要求的这个乱序程度应该是比较小的,对吧?这里边我就给一个很小的几十毫秒或者几百毫秒的一个乱序程度,然后它可以hold住大部分场景,那最后我如果要是要求结果正确,一定要非常正确,那怎么办呢?大家还记得之前我们那个window吗?Window是不是可以后边去lo lateness啊,这里边是不是还可以再去允许处理迟到数据,比方说处理一分钟的迟到数据啊啊,所以大家看我可以结合起来做对吧,就是在这儿设置一个比较小的延迟时间,然后hold住大部分的情景。
13:56
然后后边呢,如果有那个延迟时间比较长的漏漏网之鱼在后面再允许处理迟到数据,甚至还可以就是output对吧?然后这里面需要去给一个output,呃,Tag对吧,给一个这个所谓的标签啊,这个标签就是表明到底它是哪一条流对吧?测输出流嘛,放到测输流里边去,所以大家看到在sli里边是有这样的层层保证的,处理乱序数据三重保证,先去设置一个water mark的延迟时间,你这个一般设置小一点对吧,快速的先得到一个近似正确的结果,而且呢,我尽量让他能hold住大部分的情况,这个近似正确也就也就很近似正确了,对吧?啊,在很多情况下已经可以没有没有什么问题了,几十毫秒的延迟我也我也是可以接受的,然后呢,哎,那后边这里边如果还要要求更精确,那我就允许处理迟到数据,那假如说这个我不能延迟时间太长啊,对吧。
14:56
无限等下去这个资源一直不释放,那怎么办呢?最后还有一个测输出流对吧,Side output,最后兜底放到侧输出流里面去,保证这个数据不不丢,最后再去做处理就可以了啊这就是关于我们这个啊,Flink里边water mark的这些概念啊,和这些使用怎么样去处理乱序数据,当然前面还有一个知识点,就是说我们这里边默认它是一个周期性生成的,对吧?大家看这个类,它是a center with periodic automark对吧?哎,那它的这个周期到底是多少呢?哎,这里边这个周期其实我们看啊,在去设置时间语意的时候,大家点进去后看到,如果是大家看processing time的话,这个默认的时间是零,周期是零,因为就不生成water mark嘛,对吧,Processing time,那如果不是processing time的话,如果是even time或者interesting time,它默认的周期是200毫秒啊,所以这里边你看它设置了一个set all all to in。
15:56
Pro对吧?啊,所以这里边其实我也可以直接在这里边直接去做一个设置,对吧?啊,那在这里边我需要去先get当前的config,然后去set all to water mark interval,这里面直接传的就是一个毫秒数,比方说我给一个500,这就是500毫秒。
16:15
啊,大家可以就是知道这一个怎么样去配就可以了,你这个时间隔得大一点的话,那walmark进展就慢,对吧,那就是隔很久才实现,你这个小一点的话,它就会快一点,但是对性能影响可能会稍微大一点,这也是一个做权衡,自己去调整的过程。
我来说两句