00:00
于watermark这个设定,大家会发现一般情况我们用它来主要是事件时间语一下处理乱序数据啊,在这里边你基本上就是给一个就代码里边我们就引入这个类对吧,给一个当前的这个时间戳提取器啊,就指定时间戳是什么,然后呢,给它一个我们说的这个延迟时间啊,这里边它底层叫做最大乱序程度对吧?啊,你把这个给我们知道他底层希望要的是最大乱序程度,但是呢,我们给的时候,你也可以不给最大乱序程度,对吧,就是我能接受你当前没那么没那么准,也有可能丢数,但是呢,我要快呀啊对吧,我能hold住百分之六七十,七八十情况就可以了啊,所以这个情况你是按自己的需求去去指定去设定的,那就有同学可能会说,那我这里边我就想要,就不想用这个当前的最大时间戳去减一个延迟,对吧,就用这种延迟的方式去去处理,我想自定义行不行呢?当然可以啊,我们说这个类底层实现的是a sign with periodic watermarks。
01:00
那你如果自己实现的话,你实现这个接口不就完了吗?对吧?然后这个接口里边你看一眼,它里边主要有两个方,呃,主要有一个方法啊,就是这里边有一个方法就是get current water mark对吧,就是生成water mark的那个机制,另外还有一个必须重写的方法是什么呢?是它继承自time stamp a sign里边的一个方法,叫做except extract time step,就是提取时间戳啊之前大家还记得我们在这个重写的时候,是不是也得重写这个方法呀,对吧?Extra time step啊,因为你看这个类里边,它是给我们已经把这个就是这里边,呃,Get current water mark这个已经重写了,对吧,这个已经重写完成了,然后我们这里边的这个提取时间戳的这个操作呢,诶,你相当于在这个里边是还得给我再传进来一个参数,我根据我们自己。指定的那一个啊,就是当前你要提取的那个规则,到这里边来才能去提取,对吧?啊大家看你这里边是要把这个extract time stamp,就是当前我给定的这个这个方法,它要调用这个方法的,对吧?呃,然后去去复写我们这个,呃,重写这个接口里边的这个方法,所以你如果要是自己实现的话,那你就重写,重写这个接口,然后就是实现这个接口,把这两个方法重写一下不就完了吗?啊大家可以参考一下文档里边给大家也做了一个简单实现,我们可以看到。
02:29
呃,这个这里啊,大家看到这两种类型,一种是周期性生成water的默认200毫秒周期,对吧?你这里面如果说我们想要去自己自定义的话,你看怎么写呢?哎,我随便定义一个自己的periodic designer,然后呢,哎,实现这个接口对吧,With periodic automark,然后里边怎么办?我也模拟那种情况啊,模拟这个flink底层实现的情况,我定义一个延迟的这个界限,定义一个棒的,然后呢,我保存一个,大家看这是个Y,对吧,我要观察当前已经来的所有数据里边它的最大时间戳是什么?我把它保存下来啊,一开始我给了一个长整形的最小值,然后后边重写这两个方法,这两个方法怎么重写呢?一个方法提取时间戳,诶,那我就是最后返回的,当然是数据里边的那个时间戳了,对吧?提取时间戳嘛,另外我还得每来一个数据来一个时间戳,我就要判断一下,更新一下当前的那个最大值,这就相当于我保存了一个状态一样,对不对?我把这个状态要不停的去更新。
03:29
然后怎么样周期性,大家注意啊,这两个方法什么时候调用呢?下面这个方法当然是每来一条数据的时候,这个底层就给我们调用一下这个提取时间戳的这个方法,而上面这个方法呢,周期性生成,所以它就是200毫秒生成一次,呃,调用一次对吧?然后来生成一个water,大家看看它要返回一个water mark数据练习,然后插入到当前的数据流里面,所以我们这里边生成规则,你看就直接用当前的最大时间戳减去你指定的这个延迟时间就完事了啊,所以很简单的一个自定义实现的话,你就用这种方式啊,当然这种方式大家如果直接就是你用在代码里面直接去跑的话,肯定有bug,为什么呢?有很多边界情况对吧,边边角角那种情况我们都没考虑,你看那个类给我们实现里边看起来逻辑还很复杂呢,对吧?呃,核心逻辑提取出来其实就这么多,所以如果我们想要去自定义实现这个automa的生成机制啊,提取机制的话。
04:30
大家可以去参考一下这部分代码啊,那另外与之对应的还有一个就是不是周期性生成的,非周期性,我们是相当于断点式的,间断式的生成的那种方式,对吧?就是那种叫做puu eighty waters,大家还记得吧?啊,Sign with puu eighty watermarks,那这种情况实现的时候它是什么呢?哎,大家注意这里边要重写的还是实现这个接口嘛,要重写的方法略有不同,首先底层我们说还是就是那个asign time s那个底层接口,对吧?它必须得有一个提取时间戳的这个方法,这个要重写啊,那这里面还是一样,我们从数据里边把时间戳提出来,这里大家看没有乘1000,没有乘1000,意思是我本默认里边那个就是毫秒了,对吧,那这这就不用乘1000了啊,然后呢,前边我还有一个方法,这个不叫做get auto water mark啦,因为它不是周期性调用,它叫什么呢?它叫check and get next water mark,所以这个check它它check谁呢?
05:29
它的调用就是在来了一个数据,然后做了时间戳提取之后,接下来就调用上面这个方法,所以大家看现在我就不是周期性调用了,对吧?间断式的去调用,调用的时候是靠什么去触发呢?靠数据去触发,就是每来一个数据之后,我判断一下要不要去生成一个water,诶,那这个时候里边的逻辑我就可以比方说怎么样呢?哎,我就直接判断一下if当前的这一个啊,这个如果说是341的话,这个数据来了我才去生成,如果不是341的数据我就不生成啊,那生成的基这个就相当于也比较少,比较稀疏了,对吧,不会出现那种大量堆积的那种情况啊,当然这个实际应用好像很少这么干,对吧,你就直接指定某一个传感器的数据来了,我才生成沃玛的推推进时间,正常来讲肯定是所有数据都应该能推进时间啊,那这里边我给的还是你定义,哎,我就用当前提出来的当前这个当前的这个时间戳,然后去。
06:30
一个减一个延迟时间对吧,这就相当于我根本没有算它那个最大值了,有可能就会出现注意啊,大家看这个,这就有可能会出现这个时间倒流的情况,对吧?如果乱序数据来的话,你提取出来这个时间戳比之前小了,这就出现时间倒流了啊,所以这个显然是考虑的不够周全的啊。呃,如果实际应用,一般我们用的还是这个周期性生成它这种方式,我们说在处理这种数据非常密集的大数据处理场景会比较常见,比较高效一点,这就是关于不同的watermark生成的机制。
我来说两句