00:00
接下来我们在基于前面讲到的事件时间语义的窗口操作在进一步给大家做一个讲解,因为前面我们讲过窗口的API里边是有一些可选API的,特别是它可以跟walmark这个机制结合起来之后,可以去处理乱序数据,对吧?哎,所以接下来。大家会想到我在time window下边,假如说我这里乱序数据,然后本身我这儿是不是有个两秒的延迟啊,啊,刚才我们在测试的时候,尽管没有乱序数据,但是大家也发现了,假如说我这个有乱序的话,你想这个227来了之后才会关225的窗口,那是不是227之后我再来226225都没问题啊,呃,都可以正正确处理对吧?啊,这个都是没问题的,所以呃,这里边就是大家能想到watermark是能处理一些事情的,但是呢,呃,大家会想到实际处理的时候,有可能这个watermark hold不住啊,有些那个乱序数据它可能乱,乱的程度很大,对吧。啊,那比方说我这里面直接给大家举一个呃,稍微大一点的例子啊,那就是我直接lo lateness,我再定一个一分钟允许迟到数据的一个延迟,对吧,我直接来一个啊time.MINUTES1,那大家就知道接下来的状态应该是什么。
01:16
每15秒一个窗口,然后在接下来的60秒一分钟之内,是不是这个窗口都不会关啊,它是到了15秒的时候,直接就会输出一个结果,然后啊,接下来就是迟到数据,一分钟之内啊,只要来了是不是就更新一次,只要来了就更新一次啊啊,它就是这样的一个处理操作啊那。另外我们还有一个兜底的方法,假如一分钟之后这个窗口真的关了呢,再来迟到数据怎么办呢?啊,如果我们不做处理,那就只能丢了,不想让它丢的话,扔到测殊枢纽里面去,对吧?啊,那在这里我们还是上边去定义一个测书流标签对吧,Output output tag tag,呃,然后当前我们肯定就是sensor reading啊,跟当前的流里边的数据原封不动啊,放在这。
02:05
然后我把这个late放在这里,对吧,这是我们之前说的定义这个output tag的这个过程啊,这个方法,然后我在这儿直接传一个output tag进来,后边注意是不是就需要基于这一条处理完成的流去get set output啊对吧,把这个拿出来output tag。然后后边我可以做一个打印,输出当前这个数据,叫做late,接下来我们再来做一个测试,大家看一看这个效果怎么样。好,现在这个代码已经运行起来,呃,为了方便大家看清楚的话,这个我就不再输入那个六七十的数据了,因为大家知道分组嘛,呃,各自考虑各自就完了,我就只输入这个341测试就可以了,341首先来一个199的数据啊,这当然没有任何的反应了,对吧?啊,那我们直接往后推移吧,其实大家知道这个接下来就是。
03:03
是不是他一定要到210的时候才会关闭一个窗口啊,195~210,这个我们都已经知道了啊,我直接来一个210吧。这里边还没关,因为时间戳是到210了,但是事实上我们的water mark才到了。啊,两秒钟之后才到了208对吧?呃,两二零八,所以现在还没有窗口要关,那所以如果要关当前的窗口的话,是不是必须要有一个对212这里边啊,可以关窗口对吧?啊,当然就是,呃,比方说我先来一个这个211。31对吧?啊,这个时候还还没关系,然后假如说这里边我再来一个209,我给一个稍微小一点啊,之前大家看一下大概这个最小是多少,34啊,这个这个还34.7 210这个数是不是不应该在我们的这个窗口内啊,对吧,我给一个34.9,大家看比这个210这个要大,但是比前面的都小对吧?好,我给一个这个,看看这个209能不能收进去啊这个数据,然后接下来来一个212,诶大家看这里边输出了212的时候,前面窗口关闭,我这里边209的这个乱序数据收进去了吗?
04:17
哎,肯定收进去了,因为这里边输出的是不是就是它啊,二零九三十四点九,这是当前窗口是不是触发了第一次计算输出一个近似正确的结果,对吧?哎,这是我们当前这个,呃,Watermark啊,已经hold住的这个两秒钟之内的延迟数据。然后接下来大家会想到继续往后走,如果还有这个后续的数据进来的话,那是不是,呃,所有的数据就相当于都是来一个,呃,来一个就处理一个,这都会放到接下来的这个数据都会放到210~225的那个窗口里面去,对吧?呃,这个都是大家能够想到的,呃,那随着时间的推移往后,前面我们这个205~210已经输出过一次了,那假如再来之前的数据还有效吗?
05:07
哎,比方说我再来一个,比方说206的数据,呃,我这个随便给一个啊,我给一个小01:2314.2对吧,大家看是不是这里边还可以实时更新啊。这个更新的是什么时候的数据,这还是上一个窗口,205到呃,195~210,对不对,诶所以大家看前面那个窗口,尽管已经输出了一次,但是还没关,还在输出,还在更新。而且这里边就是你每次来一个数据,它都会更新一次,大家看这里边假如说假如说啊,我直接来一个36.3,大家看这个是不是相当于数据。我把这个稍微改一下啊,比方说202对吧,三三十六,那来看这个数据是不是没改之前的最小值啊,那会不会有输出呢。还会有输出,因为大家想每来一个数据是不是都会跟之前聚合一次啊,因为当前我们的逻辑里边,它它只是求最小值啊,每一次都求一次最小值,它并不管你要不要更新对吧?啊,所以这里你看还是输出了之前最小值二零六三十四点二。
06:17
每来一次都更新一次,每来一次更新一次,那什么时候它就不再更新了呢?对,大家知道是不是要等到一分钟之后,一分钟,那是不是得等到我们这个是210嘛,一分钟之后就是270对不对,哎,那所以接下来。我首先来一个270的时间,我随便给一个数啊。大家看这里边要输出一个结果,为什么又又又有一个输出结果呢?对,因为是不是第二个窗口也要关了对吧,210~225的那个窗口也要关了嘛,然后它的最小值是这个211的31.0啊这个大家能想到,哎,这个是自然能想到的,但是大家想前面第一个窗口现在现在关了没呢。
07:04
D个窗口没关对不对,因为270这个数据是来了现在的wal mark只到了268,那我们那个延迟一分钟到底是以谁为准的一分钟呢?是不是还是事件,事件还是watermark呀,对吧,所以大家看这watermark对什么都有影响对吧,只要是时间都有影响,那所以我们还是测试一下啊,比方说我再来一个202对吧。还是把这个测,比方说我来个203吧,203,呃,这个31.9,大家看这个很小对吧,我看一下能不能更新,大家看现在是不是又更新了,还在更新,那什么时候就不能更新了呢?对,自然大家想到到272的时候。272,我随便给一个34,然后呃,这里当然因为它没有关任何窗口嘛,对吧,呃,这这个我就没有任何的输出了啊,接下来我再来一个这个。
08:00
203。这个我更小一点,30.6。大家看是不是他直接把这个数据输出到哪了,直接late了对不对,直接放到测试主流了啊,这就是关于我们这个迟道数据啊,啊,到底应该怎么去处理啊,我们所说的这个三重保证,它就是这样一层一层把它做这个处理的,当然对于这个类的数据,大家看你也可以说就直接做流处理,我们这直接这个print打印输出了,那你想我这里拿到这个类的数据之后,我直接map行不行啊。对吧,直接map或者是直接那个think写入到呃,MY写入到red是不是都行啊啊,这个是完全是可以的啊啊所以你也甚至可以在这儿就直接跟之前的那个聚合结果,大家想之前这个聚合结果常规来看的话,它是不是也应该写入到某一个外部的存储系系系统里面去啊,所以我甚至可以在后边得到这个测试主流数据之后,我直接一个外部方法,是不是连接外部数据库,读之前的那个结果,然后我更新那个结果啊啊,甚至可以直接做这个啊啊所以大家就是后续的这个操作,这就是由自己业务逻辑去定义了啊,或者你可以就直接放在这儿写入到外部系统,最后是不是做批处理啊啊都可以啊,这就是关于迟道数据的一个测试。
我来说两句