00:00
后我们再来讲一讲在代码里边实际生产项目当中,Watermark到底应该怎么去设定呢?啊,那一般情况大家知道在实际生产项目当中一般处理的是升序数据还是乱序数据啊。首先我们可以考虑一下啊,一般都是乱取数据对吧?啊,所以首先那个ending time STEM那种分配方式那就不要考虑了啊,那当然也有可能是出现就是我们提前做过ETL对吧?呃,一开始就做过排序,然后呃,灌到这个卡夫卡里边啊,我们这个再再去做消费这种场景下的话,那直接用升序也是可以的,具体还是要看数据了,那如果说是这个乱序数据的话,那就涉及到一个我们是不是要设置一个watermark延迟啊。那那这个延迟时间我到底给多少呢?啊,大家要注意,这个其实就相当于我得我就得做一个考量了,那延迟的时间如果要是设的太长太久的话,那是不是导致大家注意啊,Watermark的延迟,这是我们当前整个流处理里边时间的延迟,对吧?这表就拨慢了,那是不是后面只要所有涉及到时间的操作都会滞后啊,哎,所以这里边就会涉及到,如果你设的太久的话,那我们是不是就相当于后边这个时效性就变得很差了,延迟就太高了,对吧?啊,那如果说watermark你设的太小的话。
01:25
又会有另外一个问题,你快是快了对吧,但是有可能会啊,就是有一些乱序的数据没有搞定,那就会导致输出了,得到了错误的结果,对吧?啊,所以这里面一般情况我们是首先用这个watermark的延迟时间要做一个权衡。就是我要找到一个平衡点,就是你也不要太大,导致我们最后的那个实时性特别的差,也不要太小,导致最后大量的数据都hold不住,对吧,都是乱乱序都是错误的,那所以一般情况我们在呃项目里边会怎么做呢?诶这就是我们说的啊,你可能需要首先对这个相对应的这个业务领域有一些了解,你得大概知道我们当前这个项目收集到的那个数据啊,它的乱序程度大概是一个什么样的水准,对吧?啊就是首先有这样的一个了解,或者说你如果要是对这个不了解的话,你也可以把那个数据先拉出来,大大概的做一个测试看一看,对吧?啊就是看一看大概是一个什么样的情况,甚至还可以怎么样呢,你去,诶对于这个数据去做一些分析。
02:31
用一些比方说机器学习算法啊,高级的用法对吧,用一些机器学习算法建一个模型,去判断出当前我们数据里边,它的这个延迟时间的一个分布状态,那大家想一般的分布状态应该是什么?又来了,是不是又是类似于一个正态分布啊,啊,当然这里面这个不,呃,我这儿是写成这个数轴,这是零轴了啊,应该这个延迟时间的话,应该是一个正数对吧,比方说哎,我我这里边大部分都是以这个50毫秒作为一个作为一个平均的延迟时间,乱呃,这个乱序程度啊,乱序程度大概是这个50毫秒,然后大家可能就会想到,如果要是比50毫秒大的数据和这个比50毫秒小的数据。
03:17
那么它都会集中在一定的范围内,对不对,哎,所以在这种情况下,那我们自然就可以还是按照之前的那个规则嘛,341个码对吧?啊,比方说呃,在这个呃有有可能啊,比方说在这个80毫秒。大部分的这个乱序程度以50毫秒为为中心啊,大部分都在20~80毫秒之间,那大家想我是不是这个water mark直接设一个80毫秒应该就可以hold住绝大多数的情况啊,对吧?诶它这个以下的这个所有的情况就都可以搞定了,那假如说这是一个3C格玛的话,那之前那那剩下的这个尾巴啊,就是前面的这一部分数据漏网之鱼,那应该就是很小一部分,对不对,可能百分之零点几的数据hold不住,假如我们的场景你对它的要求非常高,就是要绝对正确,最后的数据一定不能丢,那怎么办呢?
04:11
啊,那没关系啊,大部分数据我已经hold住了,现在我是不是就可以以几十毫秒的延迟快速的输出了一个近似正确的结果啊,那接下来我是不是就可以再设置一个窗口的。处理迟到机制,Allow the lateness啊,比方说大部分的数据,比方说啊,这个这个尾巴,这就看你要要到大概处理到什么程度了,比方说大部分的这个延迟数据呢,在一秒钟之内也就都到齐了,那我是不是接下来那个窗口就allow lateness1秒就可以了,再等一秒对吧?来了这个漏网之鱼,我去做一个更新就完事了。那如果说你对这个要求还特别特别严格,就即使那个非常极端的啊,可能这个呃,几万分之一,几百万分之一的这个概率才会出现的那种情况,有极端的漏网之鱼,一秒之后乱序才会到,那怎么办?哎,测殊入流嘛,对吧?哎,我就直接扔到测殊流里面去,然后你最后再做一个批处理合并不就完了吗。
05:10
啊,所以大家看在实际的这个生产环境里边,还是要根据我们具体的需求去确定最后的这个原则的,一般情况完美的这个匹配,那就是三重保证,我们都用起来对吧?啊,如果要是最简单的这个,比方说几十毫秒的一个乱序的话,那直接全局设一个water mark,把这个表调慢就完事了。当然前面我们讲到还沃设定的时候,还有两种方式,就是一种是周期性生成,另外一种是这个这个间断性生成,对吧,非周期性生成,那这两种情况它又有什么区别呢?哎,大家会想到间断性生成有一个什么好处啊,大家会想到这个间断性生成的话,是不是数据每来一条之后,是不是就可以后边就就插入这个watermark呀,对吧,那这样的话我们更新watermark就会特别的快,对吧,就会特别的实时,但是它有一个问题。
06:06
就是我们当前数据量特别大的时候,是不是相当于这个,我们说这个watermark,它相当于也是一个特殊的数据啊,那你这个是不是相当于数据就翻倍了呀,而且数据量特别大的时候,有可能会出现什么情况呢?是不是这个同一时间戳的那个数据有好多啊,那所以你会发现我来一个就后面加一个,来一个就发加一个,结果发现这个这个时间说是不是都一样,这就有点浪费对不对啊,那与之对应的周期性生成waterbook,大家想一下它的这个特点是什么呢?对,它的特点是每隔一段时间才去更新一次,每一段时间去更新一次,哎,那它的好处是,那就不用生成的那么频繁对吧?然后如果数据量特别大的时候,也不会出现说是这个呃,就是呃,每一个后面生成一个结果还都一样,但是它也有一个问题,就一个是时效性没那么强,对吧?然后另外还有一个就是如果说我这里边一段时间都没有数据来的话,它是不是还会去生成啊,那就是数据如果比较稀疏的情况下啊,这种这种状况状况,它就相当于有点浪费了,那所以大家会想一下,呃,那这两种应用场景是不是就是如果要数据稠密的时候。
07:24
那大家想数据稠密的时候,是不是我就不担心这个watermark推进不了,那是不是我担心的是这个处理不过来啊,对吧,那是不是就应该用周期性生成啊,然后如果要是数据稀疏的时候,那这个时候你就不要浪费了,对吧?来一个处理一个不就完了吗?啊,那就用这个非周期性就完事了。所以大家发现我们更关心的更长处理的应该是什么场景呢?当然是数据稠密了,对吧?我们大数据处理嘛,当然要数据稠密的这种环境下,我去考虑到底该怎么做,而且数据假如说你出现一段时间数据稀疏了,那数据稀疏是不是就代表我当前这个任务都闲着没事干啊,那你来一个watermark也无所谓对不对,那比起之前你这个数据稠密的时候,你还给我加一倍的这个watermark,这比起这个就代价小多了,所以大家会发现。
08:13
在代码里边默认的这个邦地的out,乱序数据处理以及升序处理,它都是什么?都是一个基于周期性的处理,对吧?啊,都是做了这样的一个操作啊,那另外还有一个问题,就是大家可能想到周期性去生成mark,那这个周期到底是多少呢。啊,这个周期啊,是在这里边看到的啊,大家看,因为set这个时间语义的时候,这里其实就有大家点进去看一眼。这个源码啊。大家看设置时间语义的时候,这里是不是直接就有一个判断啊,判断当前的。时间语义时间特性,如果是processing time的话,大家看它设置了一个set all to water mark in t。
09:00
这是在干什么?这是不是就是在设置自动生成watermark的时间间隔,也就是周期啊,如果是处理时间,它直接设这个周期是零,其实大家知道零的话,是不是就表示当前根本就不生成watermark呀,因为你处理时间嘛,跟watermark就没关系,而如果要是别的,那别的是不是就相当于是事件时间和摄入时间interesting time也也也生效,对吧?那设置的这个周期是多少?默认200毫秒对吧,就在这儿啊,当然通过这一个调用大家也看到了啊,我是不是在代码里边,你想改的话,也可以去做一个配置啊,对吧?所以这里边env env点啊,我是不是可以直接get config,然后去set获取到当前的这个这配置项之后去set当前的这个watermark的自动生成的那个时间间隔,对吧?那这里边我给一个比方说,我如果觉得200毫秒生成一次啊,这个太慢了,对吧?啊,我我我这个当前的这个处理能力足够啊,因为你如果大家想如果射的太快的话,是不是也不太好啊。
10:07
设太快是不是相当于对于我们这个数据处理来讲,这个就代价太高了,对吧?但是你如果设太慢的话,是不是相当于你这个延迟就就会高啊,因为你摩ma隔这么多几百毫秒才更新一次嘛,啊那所以比方说我设一个100毫秒对吧?诶那大家想接下来我就变成了一个100毫秒更新一次water mark,选取当前的最大的时间戳,减掉那个呃,设置的延迟时间,然后生成一个这样的对应的这个water mark,对吧?代表当前的事件时间,这就是代码里边的一些配置啊。
我来说两句