00:00
他们现在已经了解了事件时间的概念,也知道了water bug怎么引入,那接下来我们还是光把它引入,其实我们看不到什么具体的作用啊,那具体来讲这个东西要怎么用呢?还是需要在window里边进行操作,对吧?我们还是要开窗,然后看看这个事件时间到底是怎么去控,就是当天water mark啊,到底是怎么控制这个事件时间进展的啊,那接下来我们还是类似的实现一个这样的需求啊那。我们就基于呃,之前上一节课给大家讲到的这个even time window的这个代码啊,大家看前面我们做的这个操作,其实就是转换成sensor reading的这个po类型,另外还做了一个呃基本的操作就是是不是把这个数据相当于已经呃设置分配了时间戳和water mark呀,对吧,已经分配时间戳和water mark。所以接下来我这里设置的事件时间语义啊,那我们接下来的这个数据流里边其实就包含着water mark,也就是有这个时间的进展标志了,所以接下来我们可以开窗了,对吧?那接下来我们做一个基于事件,基于事件时间。
01:21
的。开窗聚合啊,那接下来我们还是啊,呃,前面我们得到的是这个data stream,接下来要做开窗,是不是首先做分组KBY啊,那我们先想一个具体的需求吧,当前的这个s reading数据啊,我们可以比方说还是15秒开一个窗口,接下来统计,比方说我们就统计这15秒之内的温度的最小值,大家觉得这个应该没没什么问题吧,好,那接下来我们就做这个事情啊。统计。15秒内温。
02:01
度的最小值。KY非常简单,KYID对吧?接下来开窗,开窗是时间窗口嘛,Time window里边,我这里边是不是直接传一个参数就可以了啊,这里面给一个time.SECOND15秒滚动窗口,15秒一个,然后接下来做一个是不是直接可以选取它的最小值啊,这个非常简单,我就用最简单的一种方式命或者命败啊,里边需要指定当前的temperature对吧。哦,这就是我们最简单的这种实现啊,直接就可以得到这样的一个,比方说,我管它叫main camp。Stream最小温度值的这个流啊,当然如果大家感兴趣的话,也可以把这个改成呃,Data stream对吧。啊,或者这里边不改,我们直接就用这个single output stream operator是一样的啊好,先把这个都写出来,接下来在下边我们就把它做一个打印输出,这是当前的mean camp。
03:12
整体流程其实跟前面我们开窗做测试的时候差不多是吧,而且这里面我们只取最小值的话,也没有用到那个增量,就是一般化的啊增量句和那个reduce function或者aggregate function,也没有用到全窗口函数,所以这里边非常简单,直接搞定了。好,那接下来我们来测试一下,大家看看这个它的效果是什么样的啊。呃,那既然是我们前面是socket文本流作为数据源,接下来还是需要去起一个NC。起起起,七,还是把这个骑起来。接下来我们看一下,呃,这里的代码没有问题对吧,Logo host777直接运行。呃,我们可能需要去一条一条的把对应的这个数据要输入。
04:02
好,这边启动起来之后,接下来我们看一眼啊。首先第一条数据先进来,这里没有任何的输出,肯定没有对吧,因为大家想是15秒的那个滚动窗口嘛,哎,所以我得等着啊,得得等这15秒之后才有数据,我继续往后说346。然后347。SENS40。大家会发现啊,就是在这个,呃,就是一步一步往后推移的过程当中,我好像已经过了很多很长时间了是吧?呃,感觉这十几秒肯定过去了,但这里面没有任何输出,为什么没有任何输出啊,哎,对,大家注意,现在的时间是我在这里边等15秒就能等到这个窗口关闭吗?不是你等15秒,这是这集其时间,这应该属于处理时间对吧?处理时间语义我们现在的语义已经变成了事件时间,那你如果想让他推进到15秒之后的话,得怎么样?对,你得这里的时间戳是不是得推移啊对吧?你这里边如果说只是还是按照这个去去给的话,那大家想一下,现在我的实现时间应该是多少啊?
05:19
事件时间进展到多少了?大家看到现在是不是最新的时间戳是205啊,那我现在试验时间就是205吗。我的时间进展到205了吗?其实不是,因为前面大家还记得我在分配这个,呃,因为我是用这个乱序时间的处理,对吧,当时分配时间出奥的时候是不是给了一个两秒的。W延迟啊,所以我现在205的数据来了之后,Watermark是多少,现在的W就代表事件时间对吧,是203对吧?那现在是203,也就是说203之前如果有窗口要关的话,它就该关了,对吧?啊那但是大家看没有任何输出,那说明203之前没有窗口要关嘛,啊这个很好理解啊,好,那我们继续往后看。
06:13
继续来的话,诶,那大家想一下第一个窗口应该什么时候关呢,关键是。啊,有同学觉得是206的时候就该关了吗?来,来一个206的数据就该关了是吗?啊,那我们试一试吧,这个稍微的谨慎一点啊,我们一步一步往后边一个一条数据一条数据的往后推移这个时间啊,我这里改一下给一个206对吧,206我随便给一个数36.3。大家看这里边没有输出还是没有输出对吧?啊,然后我再来一个207啊207稍微改一下吧,36.5对吧。还是没有输出,那到底到什么时候我这里才会有第一个输出呢?
07:01
来,我继续给一个209。还是没有对吧。啊,那这个我们谨慎一点啊,到这里越来越靠后了,210对吧,210,呃,我随便给一个,呃,34.7还是没有对吧,211。36点。还是没有。二幺二三十七点一诶大家看。到212的时候,突然一下子输出了很多数据。啊,所以大家看一下这里面输出的是几条数据啊。输出的。其实就是四条数据对吧,呃,Mean temp嘛,输出四条,为什么是四条。因为我们是先K分组之后,然后开窗取最小值,那是不是每一个三四都会取它当前15秒之内的最小,最小的那个温度值啊啊,所以一六七十都会取出来,当然了,六七十这三个传感器他们只有一个数据,那肯定就是他自己了。
08:11
那341的话,大家看取到的最小值是哪个。32.8209时候的这个数据对吧。好,那问题来了,当前它的这个统计的窗口到底是从多少到多少呢?这个统计的窗口。大家回忆一下,我们是输入了一个。212这个数据,然后就触发了一堆窗口计算结果的输出,那所以我们就要推测一下了,212这条数据来了之后,给我们的系统带带来了什么样的变化呢?首先212这条数据它应该,呃,就是属于哪个窗口,它先丢到那个对应的窗口里面去之后,是不是我们每隔100毫秒要生成一个water吗?Water是不是就该变了,现在的water是多少?
09:02
是210,它见两秒钟,所以210的water mark,也就是事件时间到这个点的时候,是不是就触发了一个窗口的统计输出啊,哎,那所以大家说当前这个窗口应该是多少到多少。是不是就应该是210结束啊,那往前推15秒就应该是195~210对吧,那就这样的一个窗口。啊,那所以接下来我们还可以继续往后,继续往后做一个测试啊呃,比方说我继续往后,大家说下一个窗口应该什么时候关。我来一个二幺三三十三肯定不会关对吧?啊,那下一个窗口大家想上一个是195~210滚动窗口嘛。是不是就是头连尾,尾连头啊,那上一个是2195~210,那下一个应该是多少。是不是应该是就是210开始啊,210~225嘛,15秒嘛啊,所以那我这里边大家能推测出来我应该输入什么样时间,戳数据的时候,这里就可以得到这个结果吗?对大家自然想到首先我先测一下啊,假如说我给一个224啊,大家看我给一个比方说32对吧。
10:22
呃,这个32.1这个数比较小,看着然后再给一个225,我给一个更小的31.6,没有任何的反应,对吧?啊,我继续给226。大家说会有反应吗?21.2还是没有反应对吧,那继续给227。33.6,大家看现在是不是多了一条数据?341224这个时间点32.1,这是当前最小的,为什么是224呢?大家看,明明这个225226比它更小啊。
11:01
呃,因为现在这个窗口是210~225,是不是不包括225226啊对吧?呃,225那个左前闭后开嘛啊,那个225是不包含在里面的啊,所以当然之前210~225之间的数据就这么几条,大家看就这么几条对吧。是不是最小的就是32.1,这里的224啊啊,所以大家看这里边输出的结果就是就是这个样子啊,这就是关于这个视线时间啊,触发操作之后大家看到的这个效果。啊,所以在这个过程当中,大家一定要搞清楚,我当前的这个数据来了之后,它的时间戳是多少,然后它引发的water mark的更新变化是多少,那最后窗口的关闭,窗口的计算输出是不是都是由water mark来控制的呀?哎,这里面主要是有这样的一个过程。嗯。
我来说两句