00:00
设置这个even time时间特性的话,是得在前面做什么来着,大家还记得吗?Env,它可以去大家看set stream time character对吧?啊,这里边我们要传入一个time characteristic,这里注意用这个flink streaming API,然后even type对吧?大家看是不是三个选项都有啊,Interesting time还有processing time都有,我们这里先指定我要的是even time。这就完事了吗?哎,这还没完事,大家对记得我们是不是还得在那个data stream上指定时间戳和watermark啊,对吧?这边我们必须得去指定啊,那这里边怎么指定呢?大家还记得那个方法吗?是不是啊,Sign time stamps and water marks啊,这是一个方法,另外还有一个简单的方法是什么?是不是升序数据的话可以直接assign asending time stamps,这个特别简单,这个就是默认我们传进来的是升序数据,是不是就不延迟对吧?没有任何延迟,就是到点就发车对吧?准时发车谁都不等,呃,那这个大家看里边要传一个什么东西呢?
01:18
里边要传的就是对抽取你到底哪个字段是我们的时间戳对吧?我们这里面要用的是不是就是每一个元素里边它的时间戳就是time stamp呀,啊,当然我们知道它是那个,它是秒,我们最后要的那个时间戳是毫秒对吧?所以要乘以1000啊这就是一个简单的一种实现啊啊当然了,大家简单实现显然不能满足大家的需求是不是对吧?啊,大家肯定是需求很很多的啊,我们来看一下更。更一般的场景要处理乱序数据的话,怎么去处理,是不是就是ign time sims and water marks对吧?啊,那当然之前我们是给大家讲过可以去new一个,比方说MYAS对吧,是不是可以直接去那个自定义啊,我直接这么自定义的话在下边。
02:18
呃,My aigner它要实现的是不是有有两种方式,一种叫做大家还记得吗?叫做a sign with,大家看这两个对吧。Periodical automarks,或者是puctu eighty的automarks对吧?啊,我在这里敲出来给大家看一眼啊,这里面它有泛型,呃,我们的数据类型都是sensor reading对吧?呃,那大家看这里边我实现的话必须得复写什么方法,是不是就这两个,那家一看就看出来了,自己可以去敲一敲啊,那一个就是你到底用什么去抽取这个时间戳对吧?那那这个如果我们简单的一种方式,是不是直接从element里边把那个拿出来是不是就可以啊啊,我们想乘1000的话,你看具体情况该乘1000,乘1000对不对啊,这就是最简单的一个提取,那这个watermark,当时我们是按照当前最大的那个时间戳去,呃,给一个这个watermark对不对?呃,那那大家会想到,那我这里边随便去生成可不可以。
03:23
其实也是可以的,对不对,就是大小,对,只不过就是这其实就是你完全可以去自定义嘛,我直接用一个watermark,比方说我每次给的water就是一是不是也可以啊啊,当然这个你你不能这么干对吧,实际你说明这个watermark是不是每次生成的都一样啊,水位不涨了对不对啊,但是大家看其实就这这么简单,就是你定义一个规则,你按照什么把它去生成就完事了啊,当然我们之前的那个方式是定义了,大家还记得定义了一个棒的那个最大的延迟,对吧?呃,有有个什么6万的那个那个。就是60秒一分钟对不对,另外还设置了一个叫max,什么time STEM对不对,呃,那个那个东西我给它设置成long.me value对吧?然后那大家会想到在这里我是不是就得多一步操作了,你得存下来我们当前的那个最最大的那个时间戳到底是啥对吧?呃,它它就是用什么呢?呃,当然就是当前的。
04:29
时间戳和对这个max ts里边最大的那个对不对啊,当然了,大家如果考虑到它那个1000的那种情况的话,那是不是这个这个东西还得乘1000啊,对吧,这个就麻烦一点,那你干脆干脆我们调个吧,对吧,Max ts.max然后我们这个就。Element,对time stamp乘以1000,大家看是不是就是这样做的,那这里边我们就简单了,这里边是不是就直接用max的TS减去那个棒子就完事了啊,大家看我就把这个快速的又给大家敲了一遍啊啊,那这就是我们这个周期性生成的这个这个呃,方法啊,那当然了,我们当时还给大家讲了,就是可以有一个什么呢?我可以get con之后大家看我可以设一个什么东西啊。
05:26
是不是可以设all to water mark in tval啊,当时我们跟大家说过,这个默认是200毫秒对不对啊,如果这里边你觉得不爽的话,200毫秒太长了,我要100毫秒是不是可以直接给一个浪类型就可以啊,对吧?当然它这里可以自动,呃那个数据类型转换,你要是确定要给他一个long类型也可以,对吧?啊,这个就是我们对这个周期性生成auto mark的这个时间的设置啊,那它那个200毫秒,其实大家能点进去看到它的那个设置的地方啊,呃,这个这个我记得是在这里,我们可以看一眼啊。
06:05
诶,大家看看到了对吧,他会判断什么,判断当前的那个时间语义是啥对吧?假如是processing time的话,哦,那这个直接就是零,大家想零是代表什么。没有water对不对,对吧,跟water就没关系了啊,那如果说不是的话,就直接把它设成200毫秒,这是这个就是源码里面大家能看到的这个设置啊啊,就是涉涉及到的一些东西,给大家简单的说一说就可以了,呃,然后我们这里呢,就就不这么去做了,当然大家也可以去去考虑,就是说呃,看一眼这个MAS当然就是这一部分。注掉啊MYAER,如果说你要用一个那个叫什么aigner with,哎呦,它不显示了啊,很糟糕,With叫什么pun to eighty的water marks对吧,如果是这个的话,你也是你直接就能看到它里边要实现的是不是就是这两个方法啊,哎,当时我们也给大家说了这个还是是什么时间戳二你就给他什么对吧?啊你你要什么东西就就就给他什么,比方说乘以1000,那这里边的话是不是你就可以指定说诶我在什么情况下对吧,就是假如说我是341的情况下,我就去生成一个auto mark,甚至还可以是什么,我就是每来一个是就是直接每来一个这个数据,我就生成一个worldmark是不是也可以啊,哎,大家想这是不是就实现了,来一个数据生成一个mark,来一个数据生成一个worldmark对吧?哎,所以这个是完全可以的啊,那比方说我这里边。
07:47
直接new一个water mark,这里边给什么呢?我直接大家看到这是不是有它提取出来的那个时间戳对吧,它好处就在这里啊,直接就可以把这个直接传出去,我是不是直接就生成了,相当于就是来一个生成一个,来一个生成一个对吧?哎,这个大家就是,呃,稍稍微的把这个看一看就知道了啊,我们这里主要给大家还是实现那个吧,最经典的那个方式叫什么来着,Bounded大家记得吗?一长串对吧?Bounded out of orderness,哎,好,好在你记得前几个词它就出来了,对吧,Out of of orderness timetime,诶,大家看这个好处是我一敲它什么都出来了,对吧。
08:33
它是不是里边只要实现一个一个extract time STEM就可以了,而且我们说它其实底层是不是也就是一个周期性生成的一个这个方法啊S啊,那大家会想到你周期性生成不是还得有一个get a current water mark的一个方法吗?他在哪里去get current water mark呢?这个我们知道,就还是把这个element那个时间戳提出来给他就完事,对吧?乘以1000给他就完事,那这里边的那个那个water RA怎么办呢?诶大家注意,这里还报错呢,不,不要忘记这里啊,大家看这是不是还报错呢?对,他是不是还得传一个time啊。
09:14
这个time是什么?对,就是我们的那个延迟时间,大家想是不是就是我们这里的这个棒作为一个参数传进去啊,所以大家自己都能实现的一个这样的一个一个方法,对不对?呃,所以在这里我们就把这个比方说我延迟延迟一秒钟,延迟一秒的话,它传的是个time,那我们要time,呃,哎呀,没有seconds嘛啊second seconds在下面好一秒钟,这样是不是就实现了一个处理乱续事事件乱序数据,然后延迟一秒钟去上涨水位的这样的一个操作,对不对?哎,这就是我们的这个事件时间啊好,下面基本上都不用不用变对不对,对吧,下面操作都不用变,好我们现在来看一看这个效果怎么样吧。
10:03
跑起来好,现在把它跑一下跑起来了,那我们还是来做测试啊,呃,这个我们要放到这儿来做测试了,三一啊,大家其实会想到在这个过程当中,是不是我我只都给三一的数据就够了呀,因为他做K了对吧?啊,大家想测试别的,如果想WINDOW2什么的话,那可以自己去测试这个啊,大家看先来了一个sensor reading对不对,然后没有聚合结果输出。呃,大家可能会想到你里边不是设了那个呃十秒钟吗?那那那这个是不是应该去去等十秒钟它就会输出啊。大家想一想,会不会这样?我我们好像已经等了挺久了,是不是好像已经等了挺久了,呃,那那如果大家要是觉得这个这个不舒服的话,你甚至可以把那个改的更小一点,对不对啊,看看这个效果到底是什么样,我们现在等了半天,它其实根本没有输出对吧,没有。
11:07
对,所以然后接下来我们继续啊,那大家说这个为什么它触发不了这个窗口关闭呢?当前我们的时间是什么?是我们这里等的十秒钟就是十秒钟吗?我们现在是even的time,那得是什么呀?对,是不是得事件时间掌上十秒钟之后才能够触发这个关闭啊,窗口关闭啊,那大家想一下我们现在的这个这个时间事件时间是多少。是不是就是从他这个time stamp里面抽出来的呀,那他的time stamp现在是啊,就是反正这么一串,然后199对不对,我也不知道啊,反正就是这么多199,大家知道这是秒对不对。那大家会想我们还设置了延迟,所以现在的我们的这个water mark。那个时钟一,他们的时钟应该是多少?
12:02
加上是再加一秒还是减一秒。是不是应该减一秒啊,对吧,我们水位的上涨,那个它要延迟,是不是199的这个数据来了,我不能认为199之前的数据都来了,对不对,我只认为198之前的数据都来了,这是不是就是watermark的含义啊,延迟的含义对不对?哎,所以大家想到是这样啊,啊,那我不要那么太着急了,我还是一条一条数据输出吧,那大家会想到我再给个201,诶大家看201E来了之后,他就直接输出了一条数据。对吧,呃,那那大家会想到这条数据,这相当于是谁的数据啊。这是这个窗口,是不是就应该是我们前边这个199开始的那个窗口啊,大家想想是不是这样。
13:03
因为之前我们的时间只进行到了198,那是不是199这条数据还没有被收进窗口里面去呢?哎,现在如果要是哎已经到了这个,呃,2201的话,那是不是我们的水位已经上涨到200了,哎,所以现在它是不是前面那个就可以收进那个窗口去了啊好,那我们继续往后去去走啊。这是202对吧,205,大家会想下一个输出应该是在什么的时候输出,好,当然ID不一样对吧,大家看到ID不一样,这个ID不一样,对我们的water mark,大家说这个上涨有没有影响。Water会不会上涨,刚才因为大家看这个,这个就是201来的时候他是346,但是其实是341的那个输出了,对不对,说明是不是我们这里边的346的这个时间戳影响到了341那边的那个watermark啊。
14:09
大家想一想,为什么会这样呢?为什么会这样?大家看一下我们这里边的这个代码,我们分配这个时间戳是在哪里分配的。时间戳和water mark是不是在data stream11做好之后,马上就分配了这个所有的这个时间和water mark啊,是不是在KBY之前,所以之前相当于我们这里边是大家可以,呃,简单的这么理解啊,当然我们这里边并行度都是一,那应该都在一个呃,一个slot里边啊,就是大家会想到正常来讲的话,那我们这里是不是之前都是一个任务。然后后边我们k buy之后,其实是想把它们分到不同的分区去,对吧,分到不同分区的话,那是不是前面的这个water mark应该大,大家还记得我们当时说多个输入多个输出那种情况吗?你现在相当于对于前面那个S,前面S而言是多个输出对不对,是不是这个watermark要广播出去啊,所以是不是我346的那个数据来了之后,341的water watermark也会涨,哎,所以大家会看到啊,这其实就跟我们之前讲的那个water mark的传递就关联起来了,这是这样的一个状态啊好,那大家推测一下,就是什么时候这个下一个窗口会关闭呢?
15:36
大家看现在是这个零六了,对吧?啊,那那那我继续输出吧,比方说零七大家猜多少的时候。零七还没有零八。31。还没有对吧,209。
16:00
30。还没有,大家觉得这这得到什么时候才能关啊210。大家看还没有对吧,这个这个我们的当时那个滚动窗口给的大小是多少,看一眼哦,是十秒钟对吧?十秒钟的话,那大家会想到这是不是必须得在上一个那个窗口关闭的时间十秒之后,才会触发这样的一个操作啊,对吧,才会触发下一个呃,关闭的这个事件啊,那大家看一眼这个诶是到哪个的时候关了。大家看是不是一下子输出了这么多条数据啊,KBY之后每一个呃,这个KSTEM上的每一个K是不是都输出它之前那个窗口里面的最小值了,那这里边呃,341,我们输出的是30对吧?当然这就是它最近的这个呃最小值,那大家想一下当前的这个窗口应该是多少到多少呢?
17:09
211来的时候。大家想一想,211来的时候。是不是水位线是210啊,所以是不是210的那个窗口关闭了,那210的那个窗口十秒的话,那应该是200~210对不对?那上一个窗口关闭的时候,大家还记得是什么时候关闭的吗?是不是201来的时候,199那条数据输出了呀。所以201来的时候,是不是水位就刚好涨到200,那是不是190~1200的数据输出了,是不是就输出了这个199啊,现在是200~210的数据输出了,对不对,所以就输出了中间的这四天啊,所以大家看其实就是这样的一个过程啊呃,那那当然了,这个我们只是给大家先测了一下这个这个滚动窗口,再给大家测一下滑动窗口好不好,好,我们再来简单的测一下滑动窗口啊。
18:17
呃,大家会想到滑动窗口怎么改,那这个很简单,那比方说我们这个统计15秒内的最小温度,然后是不是它的意义就是什么,就是隔几秒钟要滑动一次,对不对,隔几秒钟输出一次,大家想是不是滑动的距离就代表它输出的那个频率啊,对吧?因为窗口五秒钟就要关闭一个,隔五秒输出一次。所以这里边这前面距离改成15,后边是不是要给一个继续给一个time second给一个五,这是不是就是一个划窗了啊,这非常简单啊,好,这边已经关了,我们把它运行一下。
19:03
快速的给大家测一下啊。大大大家觉得这个能能想到他的行为吗?我现在干脆就不用再分区了,分区的那个情况大家搞清楚了吧,就就是不同K大家搞清楚了对吧,那现在我就不管了啊,我就都用这个341了,比方说我先来一个,哎哟这个很烦啊,都复制一下吧,好。先来一条341199对吧,呃,然后接下来我来一个,就是我一条一条给吧,我来一个200吧。哎,来了一个200对不对,然后接下来。我们就一个一个给对吧,因为大家猜第一个窗口会什么时候输出。诶,大家看201的时候直接输出了。
20:01
对吧,201的时候直接输出了我们当前的这个这个状态啊,所以当前的状态是什么呢?大家看是三四一三十五点八零,诶,但大家想这个窗口关闭到底关闭的是什么窗口呢。201来的时候,我们水位线延迟一秒,现在的水位涨到多少,是不是应该是200啊,那200这条数据包含进来了没有,大家看没有包含进来,诶这个大家要注意一下啊,之前可能没有给大家强调这句是吧,时间窗口它的那个开始和结束到底包含不包含开始和结束呢?那两个点呢?注意啊,对,左臂右开包含开始不包含结束对吧?啊,就是想到了开头,没想到结尾对不对啊,所以大家看啊,这里边我们如果触发200那个结束的窗口关闭的时候,200那条数据它其实没收进去,它应该属于下一个窗口了,对不对啊,所以这样是这样的啊呃,那那大家会想到接下来下一个窗口应该什么时候输出。
21:16
3202对吧,2034。呃,大家觉得是204是吗?二二零四二十八,我现在是一个,大家看我是一个递减的一个状态,对不对,我就是要看看他到底能不能把那条数据收进去,对吧?但是205能不能能不能触发它。大家猜一下。出发了吗?没有对吧,那206能不能出发?大家想得15秒是吧,那得到呃,得到二呃二幺几才能触发是吗?好,我们看一眼啊,哎,206触发了他的关注,对,大家想一下啊,不尝是五,所以我们前面。
22:05
201来的时候,水位是不是涨到200啊,然后关闭的窗口是不是应该是大家想一下它是从多少到200,是185~200对不对,是不是这样,哎,所以那大家会想到啊,185~200这个窗口关了。滑动步长是五,下一个窗口是什么?是不是就应该是190~205啊,那是不是水位涨到205就应该关这个窗口啊,哎,所以大家再看一下。206来的时候是不是水位涨到了205,所以他是不是直接就输出了,当然这里面输出的是28 28是谁的那条数据是不是它的数据只收到204为止啊,对205没收进来,这是不是跟我们刚才介绍的完全一样,那大家能想到下一条数据是什么时候输出吗?
23:08
二再往后一滑动一个窗口的那个距离,对不对,是不是应该是水位涨到210,那我们的数据是不是得输入211才能输出啊,哎,所以大家看这里边我给一个先给一个210,我们看一眼。没反应对不对,再给一个二幺幺二十。大家看是不是直接输出了一个22啊,这个22是不是代表就是195~210之间的最小的对吧,是这个206的22,所以大家这下就搞清楚他的行为了,对不对。
我来说两句