00:00
来,我们再来讲一下watermark的传递啊,这个为什么还要讲传递呢?呃,有同学可能想到,那你前面给给我们说这个watermark就相当于是插入到数据流里边来的一个一个数据嘛,那这个如果要是在这个数据流里边做操作,那怎么办呢?那非常简单,我这有一个任务对吧?啊,有一个map啊,那非常简单,它就是什么呢?就是我来一个数的时候,我做的是当前数的这个处理,对吧?呃,去做这个转换,那来一个automark的时候呢,我就要判断,诶有没有该关的窗口,该该关闭了对吧?它其实就是判断这个,所以说就相当于数据一样,一个一个来,然后我的任务就一个一个接收,那接收完了之后怎么办呢?接收完了之后当然是朝下游传递啊,就像数据一样嘛,对吧,数据要向下游传递,Watermark也也要向下游传递,那大家可能想到这个太简单了,对吧,那你这个就按照顺序嘛,它之前插入在哪,你接下来还是顺着朝下传不就完了吗?好如。
01:00
如果这里边我们就是一条顺序的这个对吧,就是如果并行度是一,就是按照一个顺序的过程一直往下传的话,这个就太简单了,但是不尽如人意啊,我们现在考虑的是分布式系统对吧?分布式系统里边这个waterma又怎么去处理呢?哎,这个就很麻烦了,这个water我们再去做处理的时候,你会看到大家会想到啊,会出现什么情况呢?就是我上游有可能有很多个并行的子任务,然后呢,下游也有很多个并行的子任务,那这就涉及到一个问题,我数据来了之后,当前这个数据处理完了,那那我可以知道接下来我到底是假如说我做了KBY对吧?呃,你是按照哈西扣去做重分区,或者说我当前是,呃做了一个比方说只是一个并行路的调整,我做一个rebll轮询去往下游发送,对吧?那这个都是可以,可以直接去定义好的数据的话,这个就确定了。那这个当前watermark怎么办呢?Watermark你说它是假如说我做了一个k buy之后,Watermark也k buy吗?Watermark的key是什么呢?不知道啊,那或者说这个water mark有人说可能想到轮询方式去发送,对吧?那water发送的时候难道是说往下游发送的时候是,诶你你当前来了第一个来了一个四的发到上面,然后下面就没有进行到四吗?然后是下面来了一七的,这个直接发到下游,下游就跳到七了吗?因为我们说这个waterma是代表时间的呀,代表当前它的这个当前的这个系统时间对吧?事件时间,那那你如果这么去跳变显然是不对的,所以这里边首先我们说一个标准,就是上游朝下游传递watermark的时候怎么传递呢?因为表示时间嘛,直接广播出来对吧,就是你这里面如果要有一个四,那就全是四下游全是四发送出来啊,为什么可以直接广播出来呢?大家想一想。
02:58
它表示的含义是什么?它表达的是四之前的,就是当前四,四这个时间戳之前的数,我当前这个任务全处理完了,对吧?啊,那当然你想我当前这个任务再往下游传递数据的时候,是不是这个处理完成之后,这个watermark之后就不应该再来四以前的数据了呀,正常来讲啊,就如果说我们设置合理的话,哎,那所以接下来我的这个下游的任务,当然也可以认为我的时间就进展到四就好了,因为之前的数据都处理完了呀,你后面再来的数据不会再来这个四以前的数了,所以我就直接把这个射程,呃,这个定成四就可以了,对吧?哎,这是上游城朝下游传递,直接给这个广播出来就可以,但是这里又有一个问题,就是说那我直接这个下游任务啊,我接收到上游任务的这个四了,那当前我的时间就已经是四了吗。
03:58
哎,我们说这个auto mark就像一个时钟一样,对吧,就是它当前这个事件时间的指示嘛,这就是当前的这个clock时钟,我的钟表就真的也已经到了四秒了吗?其实不是的,因为大家想到上游有并行任务啊,那下面的两个并行任务还有可能给我发数据啊,这个四秒表示的是上当前的这个并行任务一不在有四之前的数据给我发送了,但是下面这两个任务还有可能会发送,对不对?哎,那所以这个怎么办呢?这个的一个处理方法就是我在下游任务里边设置给上游的每一个并行子任务设置一个分区watermark。
04:41
也就是说各自保存他们的那个water放在这儿,你的这个四来了之后呢,只是自己的那个时钟分区的时钟变成四了,那下边的时钟呢,还没来,对吧?我要等到下边的对应的这个分区来了water,我才调这里的时钟,那问题就又来了,我自己的这个时钟到底以谁的为主呢?按谁的来呢?
05:05
好,其实大家就想到了,那当然要以最慢的那个时钟来,对不对啊,因为最慢的那个时钟,假如说下面有个三的话,那是不是我必须要下面还有一个一个五对吧?那我必须要以三来,为什么呢?以三,我当前的这个时钟如果定成三的话,那表示三之前的数都到齐了,所有的任务是不是就相当于都不会再给我发三之前的数来了?哎,你看看是不是因为上面这个四之前的都不会来了,对吧?三之前都不会来,五之前的都不会来了,那当然三之前的都不会来了,所以我定义最小的这个分区时钟作为自己的时钟,这是可行的啊。所以这就是watermark传递的一个规则,就是如果上下游有多个并行子任务的话,上游朝下游传递是广播,那么下游接收上游的这个watermark的话,会保持一个分区,Water自己的时钟呢,是以他们的最小的那个为准。
06:04
啊,所以接下来大家看一下这个具体的例子啊,那当前的这个例子,我只考虑就是当前这一步操作啊,只考虑它的上下游,因为它并行的那个任务的话都一样,对吧,跟它没关系嘛,只有它的上下游对它这个waterlog传递有有影响啊,那这个大家看这个下游有三个箭头对吧,所以它下游有三个并行子任务,上游四个箭头,上游有四个并行子任务,那接下来呢,哎,我们看到。给每一个分区保持一个分区water,对吧?哎,当前的这个四个分区分别是2436,最小的分区watermark是二,所以自己当前的时钟是二。然后那大家就会想到这个二就广播到下游了,对吧,它下游的任务都认为,哦,当前这个分区始终是二啊,这样就知道了,然后接下来来了一个四,看第一个分区,这里边更新了一个,来了一个四,这里面我们没有考虑考虑数据啊,就只看更新四之后,他先要更新的是分区watermark,然后接下来呢,比较最小值,现在最小值是不是变成三了,诶三比之前的二增大了,所以我自己的时钟就前进了,就变成三了,然后我就把这个三再广播出去,发送到下游啊,就表示我三以前的数都搞定了,都已经处理完了,我的时间是三了啊,你们接下来从我这儿不会接收到三以前三三以前的数了啊,这就是这个含义,然后接下来呢,哎,如果说再来又来了一个七七,把这个第二个分区的water map改了之后,诶,你发现这个最小值没没变对吧?哎,所以当前它的时钟不变,如果不变的话,它就也不长。
07:43
朝下游去广播了,就不变了,对吧?呃,之前的一个还是三还是三,然后接下来第三个分区,这个六来了之后,这个时候最小的值变成了四,那么当前的时钟变成四,再广播出去,它的时时钟就是这样的一点一点推移的,这就是water mark在这个并行处理的过程当中传递的一个规则。
我来说两句