00:00
来我们还是,呃,前面我们是做了一个简单的API测试啊,那接下来我们举一个具体的需求的例子,大家看看在这个类似于项目生产实际里边是怎么样去用这个process方式放这个大招去解决我们的需求的,那接下来我们提出一个需求是我要还是监监控传感器的温度值,我们现在需求呢,不是说像之前我们做了一个操作,是呃,检测连续的两次温度值变化,如果超过一定限度的话就报警,我现在呢,现在不是了,我现在是检测这个温度值,如果在十秒钟之内。十秒的一个区间范围内,温度是连续上升的话,我就输出一个报警。那大家可以思考一下,现在我的这个需求到底怎么来实现?用之前的那个状态编程的思路可以实现吗?哎,大家想既然是温度连续上升嘛啊,那简单我还是是不是还是要有状态啊,还应该保存之前的那个温度,我现在去做对比才能知道它是不是连续上升嘛啊,所以那还是我把之前的那个温度存下来,存成状态,这个好像没问题对吧,那我接下来是能判断温度上升了。
01:11
但是还要有一个十秒钟的限制,十秒钟的话,这个怎么我我怎么样去判断它这个十秒钟的这个特点呢。啊,那有同学可能想到了,那既然跟时间相关,那要不我开窗口吧,我开一个十秒钟的滚动窗口可以可不可以。这个好像不对,对吧,因为大家想如果我开这个滚动窗口的话,之前我们说过,即使点是不是一开始就确定了,即使是你给偏移量,那也是一开始就确定了,对吧,那我是不是根本假如说啊,我把这个窗口就画成一个就类似于这样一个框的这个状态啊。那我现在是不是根本不知道这个数据到底是什么时候来,什么时候去做判断啊,那大家想是不是有可能比方说啊,我这里边统计这个这个窗口啊,当前这个窗口里边,比方说在这儿来了一个数据,然后接下来它是上升对吧?跟比之前啊,当当前这个是上升了,然后接下来来了一个数据,它下降了,然后后边又来了一个数据呢,是上升。
02:15
然后后边一直都是上升,诶大家想在当前这个窗口里边,这叫连续十秒温度上升吗?这个不叫对吧,然后接下来下面又一个十秒钟的窗口。对,大家可以想到这是滚动的对吧,然后接下来又是温度连续上升上升。诶,但是大家想到我在后边这里边下降了。这个叫连续上升吗?这个也不叫连续上升对不对,但是大家想中间这里边,这是不是温度连续上升有可能都超过十秒了呀,这就是我们说的,我并不知道当前的这个开始的这个时间点到底是哪里,所以在这里如果说我要去直接开滚动窗口去做这个判断的话,那是不是这个就不太合适啊。
03:03
哎,那有同学说,那我是不是开一个滑动窗口呢。开一个滑动窗口可以做这件事吧,这看起来好像是这个窗口滑动了一截吗?但还是有这个问题,你到底滑动布长给多少呢?除非你给的足够小,就是我们说的最小单位不是一毫秒嘛,你一毫秒划一次,那那是基本上所有的数据都能都能涵盖进去了,对吧?但是那个效率是不是就太低了呀?所以这里边我们其实最好的方式不是一开始就把窗口。它到底长什么样就都限制死,而是是不是应该根据数据来啊,是不是来一个数据在他假如说这个数据比之前是上升了,那接下来是不是我在他后边开一个十秒钟的窗口啊,所以大家想想这样的开窗方式,这应该是怎么样开。有同学说啊,那那这个难道这不确定开窗起始时间,这个难道是会话窗口吗?这也不是会话窗口,因为会话窗口是是要一段时间不来数据对吧,而不是说一段时间来的是下降的还是上升的,他判断的是不来数据。
04:10
那所以我们现在最好的方式,其实就是从这个第一个啊,温度开始上升的这个数据开始,是不是注册一个定时器啊。注册一个十秒钟之后要触发的定时器,然后我收集这十秒钟之内所有的数据放在一个呃状态里边,那接下来到定时器触发的时候,这是不是就相当于一个窗口收集齐了所有的数据啊,然后我就可以判断它到底这个是不是连续上升了,对吧。所以这就是一个非常简单的一个思路啊,利用定时器是不是就可以实现一个有界流的截取啊,就相当于我们自己手动开了一个窗口一样。啊啊,那当然这里边我们想的这个思路是做了一个有机流截取,我是可以把这个所有来的数据都保存下来,那大家想我在具体实现的过程当中,也可以做一个增量许可,大家想我我需要把所有数据都保存下来,最后再去判断是否上升吗?其实不需要,因为我是要判断它连续上升才报警,那假如说我这里边中间出现下降的话,那怎么办?
05:19
那是不是我直接就相当于从头开始把定时器删了就完事了呀?从头开始记不就完事了吗?那如果要是连续上升的话,我也不用把所有数据都存下来,大家想我只存一个什么就可以。是不是我只存一个状态就表示当前这个温度是在上升就可以了呀?哎,但是只存一个状态还不行,大家可能想到对如如果要是下一个温度来了,是不是我还得知道上一个温度是多少,然后做一个判断才能知道它是否连续上升啊。哎,所以接下来大家就想到了,我现在是不是只要还是跟之前我那个温度跳变一样,我是不是保存上一个温度值就够了呀,你不用把这个窗口内所有温度都保存下来,我就保存上一个温度值,然后如果说我判断比之前的温度值大,那是不是现在是温度连续上升啊,那注册的那个定时器这个窗口是不是就就不要关,到时间的时候我要输出这个窗口的结果,那假如说遇到了下降,是不是我就应该清空状态,相当于删掉窗口对吧?窗口就直接关了,而且是不是要把那个定时器也要删掉啊,窗口相当于是不是不输出了,一切东西都清空对吧?重新开始记就可以了。
06:29
好,那所以接下来我们要实现的这个过程思路已经有了,接下来就是一个状态编程,还用到了一个定时器,用定时器相当于实现了一个自定义的窗口操作,那大家想一下,既然用到定时器,是不是必须得用到process function啊,对吧?啊,那状态编程当然它也可以实现嘛,所以当前就是直接放大招,用一个process function来实现这个功能。接下来我们写一下第二个测试代码。这就是process test2啊,我们把这个叫做一个应用的事例,对吧?Application case,我们现在要检测的是温度连续上升啊,那前面的过程基本上都一样,代码的整体流程这个不变对吧?我直接copy前面的这个K的process function测试的这个流程啊。
07:24
主方法直接copy下来,然后大家看,呃,这个环境已经有了,然后从socket文本流里面读取数据,把它转换成一个sensor reading类型,接下来是不是我还是要根据,哎,大家想一下,现在我要要基于这个K做分组吗?还是要分组对吧,哎,这这里边你不分组的话,我们后边会导致这个得到的这个判断啊,你温度连续上升,那就有可能不同的传感器混在一起了嘛,对吧,所以我们还是要各自判断判断各自的,然后接下来当然就是要拗一个新的自定义的kid process方式了,我把这个叫做呃,温度连续上升对吧,呃,这个temp,呃,Continues啊,Comes,然后increase,对吧。
08:11
呃,Inquiry,然后warning吧,我就都简写了啊,然后另外这里面我也可以传一个参数,就是传递到底是控制多长时间范围内的那个温度上升对吧?比方说我们现在十秒钟我就传一个十过来,然后后边直接做一个这个打印输出对吧?啊,这个就是大家自然能想到的一个程序的实现框架,然后接下来就是实现自定义处理函数,呃,检测。一段时间内的温度连续上升,输出报警啊,那这里就又涉及到一个问题,就是说我们这里面的这个public static啊,我先写出来class前面我直接copy一下啊。
09:02
温度连续上升报警啊,那么他要去extend一个key的process方式还是这样对吧?哎,那这里边KIO问题又来了,当前的这个k iok是什么?哎,对,还是当前的那个ID,我们用的是那个元组对吧?呃,那还是一个temple类型,那输入当然是ssor reading了,输出是什么呢?这就看自己自己定义了,对吧?比方说我就输出一个报警信息来一个string是不是就够了,我就输出,诶当前哪个三四,呃,34ID啊,对应的那个传感器,然后连续十秒内这个温度上升,我输出一个报警就完事,S string把它定义出来。呃,然后在里边大家会发现这还抱着错的啊,因为我们这里边是不是得有一个属性,有一个参数啊,所以这里边呃,定义私有属性,这个就是当前统计的,呃。
10:00
检检测的这个呃时时间间间隔区域对吧,时间间隔,所以这里边我统计一个private integer给一个这个interval。时间间隔啊,然后接下来我实现一下当前的这个构造器,把这个写完上面就不报错了,接下来我们是要是不是该定义状态了,对吧。我们现在的状态其实主要就是要来保存,还是跟之前那个温度跳变检测一样,是不是只要保存上一个温度值就可以了,保存上一次的温度值,另外还得有一个,因为大家想到我们后边是不是还要在如果出现温度下降的时候,是不是还要检测,检测到温度下降,是不是要删除定时器啊,删除定时器能直接删吗?就我能获取到当当时注册的那个定时器时间戳是是什么吗?获取不到,所以大家回忆起来我们之前是怎么做的,对,是不是还要保存到一个状态里啊,这个状态是不是也应该是一个k set,跟当前的K有关的对吧?哎,所以另外我们还要有一个定时器时间戳啊,就是我们要定义两个状态了啊,Private value state。
11:24
首先啊,上一次的温度值,这是一个double类型,Last state。另外还有这个长整型的时间戳value long,当前是这个time timer。TS,对吧,State,我们把这个定义出来,呃,然后大家知道我必须要在这个生命周期open里边去获取当前的状态,控制距柄,所以在这里边我去定义last time state,还是这个流程,再复习一下get run time contact get state啊,New,一个value statescript里边,这里边应该是last,上一次的这个last temp给一个名称,后边是double类型的class,对吧,直接给一个这个啊,当然了,这里边大家想到我是不是这个上一次的这个温度值,我可以给一个初始值啊。
12:26
对吧,因因为这样的话,我就可以直接去判断去做做这个调整了嘛,你一开始的话,这个如果直接判断的话,它应该是nu对不对,如果是nu的话,我们应该是直接把当前的那个温度值保存成就是当前的这个,呃,就是对,就是初始的这个温度值,然后注册定时器是不是就可以了啊,那那这里边我们也可以不把它当成到比方说我给一个初始值,大家想我给一个double类型的最小值行不行,对吧?哎,这就是我当时来了之后,肯定是不是比之前也是上升,上升的话我就注册定时器,对吧?啊,所以这里边我可以给一个double的mean value啊,也可以做这样的一个判断。
13:07
那另外就是timer ts state,我把这个get runtime contacts,然后get同样的啊value state script对吧,里边这里边给一个timer ts名称,最后再来一个长整型的class,当然这个就不需要有这个初始值了,对吧?定时器的话,这个一开始没有嘛,然后后边是不是必须要实现的是一个process element方法呀,那么在这个process element方法里边,我们应该是首先是不是应该取出状态,取出状态方便我们使用啊,所以double类型的last time上一个温度值从last time里边。State里边点value拿出来,另外长整型的这个时间戳timer ts也是从当前的这个状态里边啊,不是clear啊,Value对吧,从状态里边拿出来,然后接下来是不是就要判断了,如果温度上升是不是就要注册十秒后的定时器是不是开始等待啊,相当于我们有一个时间窗口要要开始等了,对不对,但这里大家要注意啊,现在你既然初始值这里边是这个。
14:30
命value啊,我这里边是不用去做那个是否为nu的判断了,但是假如说第一个来的数据的话,跟我当前上一个温度值比较,是不是一定是上升的呀?哎,那大家会想到这里边一定上升的话,我注册定时器这个合理对吧?第一个数据来了之后直接注册这个是合理的,但是另外还有一种场景是,如果说我之前已经注册过定时器了,然后你现在温度又上升的话,大家想一下。我们之前不是说这个温度连续上升嘛,第一个数据这个上升,这注册了一个定时器,十秒钟的定时器开始统计这个窗口了,对吧,那后边这个又上升的时候,你在这儿又要注册一个十秒钟之后的定时器吗?
15:12
这应该不注册了,对不对,不要重复统计对吧?哎,我就按照这个一个十秒钟范围内去统计就完事了,等到这个统计结束之后,后面再再开始统计对吧,我就用这种方式啊,那所以这里面是不是要啊,并且怎么样。并且是不是没有定时器的时候啊,对吧,没有定时器的时候我才去注册,所以这里我的判断条件是if。If value点判断这个时间戳对吧?该呃判断这个温度值啊,不是时间戳温度值如果要是大于last time上一个温度值的话,并且timer ts是不是等于nu的话,这是不是没有当前没有这个注册定时器啊,那接下来就注册对吧?啊,那这里边我们首先呃,就是计算出定时器时间戳。
16:12
长整形对吧?TS啊啊,那大家想到这个是不是就是我用当前的这个current processing time,我现在注册都是都是按照这个处理时间来的,对吧,我当前的处理时间语义嘛,用当前的处理时间先获取到,然后是不是我直接加上指定的这个interval乘以1000,就是我要注册的那个毫秒数啊,所以接下来ctx timer timer service。去注册一个,注册的是。你要处理时间就一定注册处理时间的这个定时器对不对?我这里边来一个TS放进来,注意注册完了之后是不是还要更新状态啊,要不然的话,下次来了之后,你这个timer ts是不是还是now?呃,所以这里边我不是改这个timer ts,而是要更新当前的这个time ts state啊,把这里边的这个TS存进去,这就完事了,这是注册。那另外还有一种情况,那我现在就直接等着就完事了吗?接下来大家想,这是不是接下来注册了之后,下一个来了之后,如果还是上升还是上升,但是已经有了定时器,是不是啥都不干啊,等着就行了,对吧?哎,那后面大家不要忘记有一步操作是不是得更新状态啊,更新温度状态,所以我要把那个last temp state update成当前value.get temperature对吧?这一步操作是必须要有的啊那。
17:37
另外还有一个需要做考虑的,你刚才说的是那个如果上升的话不用做了,那如果下降呢?哎,对大家想下降的话,是不是要删除定时器啊啊,所以这里面我们其实是如果温度下降,那么删除定时器啊,当然这里边有呃两种选项,就是说你也可以直接就把这个温度也全部清空,最后再重新开始。
18:06
但大家想没必要对吧,我是不是总保持上一个当前的这个最新温度值,作为这个上一个温度值就完事了啊,所以这里边我不需要清空这个啊,啊那所以这里边else if,我是不是要判断,如果value.get temperature,如果要是下降小于last tempmp,并且这里边注意啊,并且是不是必须要有定时器你才能删啊,没有的话你直接delete一个nu,那是不是它会报错啊对吧?空指针啊,所以这里边我要并且timer ts不为空的时候,我就删除定时器,哎,那这里边ctx timer service现在是delete对吧?Delete还是processing time timer这里边传的是状态里边取出来的那个timer ts对吧?哎,那有同学说,哎,那你这个跟上面这个是一样的吗?
19:00
不一样,这个TS这是我们注册的时候用的是这个对吧,而这个TS他们TS我这是什么,是不是从状态里边拿出来的呀,每来一个数据都重新拿一遍,对不对?诶那所以这是更新之后的那个状态啊,啊,这是当前这个,那取完了之后,大家要注意是不是已经没有定时器了,是不是应该这个timer ts又应该又又应该制成空啊,所以那个状态是不是要清空,所以timer ts调一个clear,这就是这样的一个处理的流程。当然还没做完,后边是不是?呃,这是这个process element,后边真正定时期触发的时候,是不是应该调到这个on timer里边来啊,所以这里边大家看就是定时器触发,其实这个我不用做任何判断了,大家想是不是只要触发定时器一定是连续上升没有被清空啊,那是不是直接报警就完事了对吧?因为只要是那个下降的那种情况,我们都已经删掉了,它闹钟就不会响了,所以现在只要触发我们就输出报警信息,那咱们输出是不是out.collect对吧?那这里边既然是字符串嘛,那直接写就完事了啊,比方说我这里边写传感器,哎,那我加上当前的那个34ID,诶大家想34ID怎么拿?
20:20
哎,现在是不是有上下文啊,对吧,这里边我是不是可以get current key,但是注意当前的这个current current key应该是个什么类型,这是个这是个temple类型,对吧?所以接下来是不是我还得去诶。接下来去get field0对吧,直接把它这个当前的这个第一个字段拿出来就完事了,然后后边我再给一个啊,就是温度值连续啊,就是多少秒,我们的那个interval对吧?秒啊,我就直接写吧,上升,这是不是就是我们完整的一个流程啊啊当然如果大家需要最后做那个收尾操作的话,可以在close里边,呃,再去写一下那个clear对吧,因为我们有一个状态是不是一直没有清空啊,对吧?啊,就是我们这之前那个last temp啊,上一个那个温度值一直没有清空,另外还有一点,这里这个定时器触发之后光输出没完。
21:24
还得清空状态,因为大家想现在这个状态是不是我们那个timer里边还有啊。大家想定时器触发跟这个timer的这个状态是一回事吗?这个会自动清空吗?不会对吧,这是我们自己定义的一个value state,它是不是就就跟我们这个温度值一样,他只知道这是一个长整型的状态,对吧,你不手动清,他并不知道在这个定时器触发的时候就要清空,所以我们这里边要做一个清空操作。啊,所以完整流程做下来之后,其实是这样的一个过程。好,这就是一个完整实现啊,我们实现了这个连续十秒钟之内温度连续上升的话,我们做一个输出报警啊,接下来我们可以还是测一下。
22:09
在代码里边,在这个测试的环境里边,大家看一下这个效果到底是怎么样的啊。好,那首先接下来我给一个。给一个这个341,给一个温度值,然后现在我们是那个视线时间,所以前面那个时间戳我不给无所谓对吧?啊这个无所谓啊,我来一个36,来一个37,诶大家看现在是不是隔了几秒钟之后,确实是十秒钟之之后,它直接输出了341温度值连续十秒上升对吧?呃,输出了这样一个报警信息,诶大家看这里边它还在输出这样的一个这个呃,就是连续十秒上升的这样的一个一个信息,这这说的是我刚才是不是37数慢了呀,对吧?是前面两个这个温度值都已经就是第一次统计的那个十秒窗口都已经结束了,然后我又输入了一个37,这是不是又注册了一个定时器啊。
23:05
然后大家会注意发现我这里面是不是相当于只有一条数据的话,它是不是默认也也认为这是连续连连续上升啊,因为我们一开始初始的那个温度值是最小的那个double类型的那个呃命对吧,所以他会认为第一次来了之后是一个上升,后边我们也没做判断,只要没数据来,还是认为要要去输出这个结果的啊,当然你也可以做一个逻辑判断,对吧,你要求必须是有第二条数据来了之后,我才去注册定时器,这个是不是也可以啊啊,这样的话就至少保证有两个啊,我们这里边是没有做这个操作的啊,那后面我们还可以多做一些其他的测试,比方说我这里还可以有346。大家看这个346,我快速的给,呃,给一个15,再给一个16,大家想这个会报警吗。347341再来一个三十八三十九对吧。大家说接下来报警是,诶大家看这个346这里边是不是就做了这样的一个报警输出啊,这个报警输出大家注意,这是这是谁报警了呢?这是15.4和15这个报警了吗?这是谁注册的定时器报警了,大家注意,这是15.4,我们注册了一个定时器,然后15来了之后,是不是直接把它删了呀,然后我们这里边报警其实是什么?是不是16它判断比之前是上升了,然后他又注册了一个定时器啊,然后过十秒钟之后是不是他输出了呀?啊所以其实这样的一个过程啊,啊所以大家下来之后可以再把这个过程再好好的测一测啊,啊整个是确实实现了我们这个功能,而且是按照不同的这个啊,刚才这个可能大家看的还不是很明显,因为这里边这个341本来就比那个346的那个温度值要大,对吧,那比方说我这里边341连续上升,然后346。
25:00
这里边比方说15,然后14。然后13大家说这个346会报警了,341是不是还报警了,这个346没影响到它对吧,各自统计各自的,然后346大家看是不是就不报警啊,因为你这是下降啊,一直在下降,一直不上升的话,那当然就不报警了。啊,所以这其实就是,呃,就是我们这样的一个整体处理流程啊,利用process function实现了这样一种特殊的需求。
我来说两句