00:00
哎,我们再给大家讲一下watermark在整个流游处理过程当中的任务间的传递,哎,那有些同学可能会想到任务间传递,这还有什么问题吗?之前我们不是说了它就是相当于一个特殊的特殊的数据吗?那我们数据这个就是呃,直接往往下游一个一个传,那呃,当前摩拉是不是也是朝下游一个一个传啊,按照这个顺序传不就完事了吗?哦,大家看这里边我们只考虑的是这就是一条流,然后就是相当于只有一个分区,按照顺序一个一个排,排整齐了,那当然一个一个传这个没有问题,当前一个分区是不是可以保证它的这个顺序不变啊。啊,那接下来问题就来了,实际使用的过程当中,我们只能有一个一个分区吗?当然不是,每一个任务可能是不是都是并行的呀,它有多个并行的并行子任务,那接下来就有一个问题,我们数据本身在不同的啊前后啊。
01:01
不同的这个任务之间做传递的时候,它是有这个规则的,当时我们讲过可以forward当前分区对吧,也可以KY基于哈西code去做重分区,还可以这个shuffle reb,对吧,Re scale啊,甚至还可以global啊,各种各样的这种有数据传输方式,那接下来的这个当前的water怎么传输呢?哦,这里面大家就值得思考了,对吧?呃,首先我们想到的是watermark可以朝下游传递,那大家想一下,如果说啊,我当前一个任务。它下游有多个这个子任务的话。数据朝下游的传递的话,这是我们前面讲的,你如果是K外的话,按照哈西扣的主分区对吧?啊,属于哪个我就我就分到哪个下游的分区啊,那如果说呃,要是这个只是一个并行路的调整,然后不涉及到这个k buy其他的这些重分区操作的话,那我这里边就是一个re balance轮巡对吧?呃,轮询去做一个分配,诶那这里边接下来问题就是,如果是whatmark呢?
02:06
来了一个,我怎么样朝下游去传递呢?是轮询吗?呃,来了一秒的发到这个,来了二秒的发到下一个吗?还是说其实哈希code啊,What,没有哈希code的对吧?它只有一个时间戳嘛,大家想是怎么样?对,其实这个大家想到我当前的这个时间戳是不是代表的是一个当前的时间啊,那时间是不是应该要同时通知下游任务啊?呃,另外还有一个含义,就是说我当前看到1WATERMARK的时候,不是代表他之前的数据都处理完了吗?那是不是我把这个watermark就应该广播到下游任务,然后告诉下游任务之前的数据都处理完了,接下来我给你发的数据就都是后面的数据了,对不对?哎,那所以下游任务是不是也就可以把自己的时间推进到这个时间点了,那所以当然它是应该要广播到下游任务的,所以首先我们要想到从上游向下游传递,是把watermark广播出去。
03:08
好,那接下来问题就又来了,那上游任务只有一个吗?诶,不是啊,上游任务是不是也可以有有这样的一个并行子任务啊,对吧,我也可以这里边还同时有一个,那按照我们的规则,他是不是也要广播这个watermark呀。那问题就来了,上游和下游的这个watermark是同样的吗?同一时间广播的是同一个watermark吗?那就没准了,因为它是插入到那个数据流里边,数据本身就不一样,那watermark是不是也不一样啊?然后每个任务要并行计算,它处理的速度不一样,当然也不一样啊,所以有可能出现什么情况?我下游一个分区子任务啊,并行的子任务接收到是不是同时接收到的这个上游任务传过来的watermark不一样啊,一个一一个二,那我以谁为准,问题就来了,这就是看上游了,对吧。
04:06
以谁为准呢?我当前的时间应该是一还是二呢?啊,有同学说一,有同学说二,大家想一下,这个还是要回归water map的本质,Water map的本质说的是事件时间进展到这个时间了,之前的数据是不是都到齐了呀?哎,那你想我当前这个任务应该能保证是一之前的数据都到齐了,还是二之前的数据都到齐了,我是不是只能保证一之前都到齐啊,因为这里的这个二它只是当前第二个上游分区,是不是保证二之前的数据都到齐了,那第一个分区是不是还有可能还会给他传啊,哎,那所以我能保证的是不是应该是最小的那个watermark之前的数据都要齐啊,啊所以这就类似于我们说这个木桶原理嘛,对,最短的那个那个木板代表了我们当前这个桶能放的水量,对吧?哎,所以这里面watermark也是一样的,所以大家看一下底层原理是每一个任务它可能都有。
05:09
同时有好几个并行的上游任务在给他发watermark,那同时他可能也有并行的好几个下游任务,他会去广播watermark啊,那大家看当前这个状态就是一个任务啊,它是不是有四个上游并行任务啊,对吧,都在给他发数据发watermark,另外他有下下游是有三个并行任务对吧?啊,那接下来它对于上游的四个任务,大家看每一个任务都会。他专门给分配一个空间去存一个。当前这个分区的watermark,大家看这个就叫做petition watermark对吧,分区水位线分区水位,那所以接下来你看当前的这个分区water rockck是2436,那我当前自己这个任务,当前的water rock是几呢?最小的那个对不对,我就以当前最小的那个watermark分区WATERMARK2作为自己的事件时钟。
06:09
也就是说我当前的时间就是二表示两秒钟,如果有窗口要关闭,如果是零到二秒的窗口的话,是不是该关我就可以关了。为什么可以关,是不是因为所有二之前的数据都到齐了呀,对吧?哎,能保证吗?所以我可以关了,然后接下来是不是就可以把这个二广播出去呀。下游的任务是不是也可以知道哦,我当前这个分区二之前的数据都到齐了啊,所以这就是这样的一个传递的过程,接下来呢,哎,大家看第一个分区water mark变成来了一个新的water mark啊,这个圆圈表示water mark啊,它是来了一个四,那就更新当前第一个分区的water,然后接下来怎么办?是不是比较所有分区watermark再选区选出一个当前的最小值啊,哎,然后我发现,诶,现在最小的是第三个分区的三了,那是不是我当前任务的事件时间就朝前推移了,有两秒变成了三秒,三秒之前的数据都到齐了,如果有三秒的窗口是不是可以关了,接下来我就把三秒watermark啊朝下游广播出去,下游的时间也就推进了,所以大家看就是前这个上游到下游是不是相当于通过这个water mark的传递。
07:25
就可以一层一层的推进我们的事件时间啊,而且大家就会发现了,现在的这个特点是每一个任务它的事件时间是不是不一样啊,可以当前的事件时间是不一样的,因为大家想本来就不一样啊,因为你流出里是不是有先后顺序啊,那你我有一个数据是这个S任务,然后有有一个任务是S啊任务有一个任务是S有或者是后面那个川S中间的某一步操作,它当然实验时间是不一样的吧,对吧?啊,那显然是S那边的时间要要要前一点啊,提前一点,要大一点诶,那后面的任务可能会时间会滞后一点,所以通过这个walmark就可以非常完美的把我们这个当前的时间状态表示出来啊,那后面当然了,如果说再来一个这个第二个分区来一个七,大家看更新分区water mark,然后对比现在的最小值还是三还是三,那是不是不更新啊,大家要注意一下这个这个规则啊,如果不更新water。
08:26
Mark的话,它就不会向下游再发发送water mark,因为大家想是不是当前我的那个时间并没有改变啊,对吧,下游它知道当前的时间是多少,所以说这个我就不再发了,因为你广播water rock是不是相当于也是一个特殊数据,也会耗费我们的资源啊啊,所以这个就不发了啊,然后后边如果说第三个分区又更新了一下,来了一个六,那现在最小的water分区water是四,是不是就又更新,然后又广播出去了啊,这就是watermark在上下游任务之间传递的一个规则。
我来说两句