00:00
那接下来呢,我们就要用到刚才已经掌握的这些知识,怎么样定义状态,然后怎么样去做这个状态的操作管理,然后我们要解决一个具体的实际需求,那现在我们想一个需求啊,就是当前还是这个传感器的例子,我们现在想干什么事呢?我想需要做一个判断啊,就是做什么判断,之前我们是做了一个分流操作,对吧,对于这个温度到底是高还是低,做了一个分流操作,那现在呢,我可能并不是像之前我们是这高温,哎,我直接判断它大于这个30度,我就直接报警了,那现在我要要的呢,场景不是这样的,我要的不是说根据当前的温度值来来判断报警,而是要要怎么样呢?是要求温度不能跳变,就是你当前这个温度值高,高没关系,我只要要求是什么呢?连续来的两个温度值啊,我这边就是没有做那个时间的判断,对吧,我就是考虑连续来的两个温度值。
01:00
它必须温度值在十度之内,你如果连续来了两个温度值,温度一下子跳变了十度以上,我就要做一个报警。哦,那接下来大家就想想,这个需求我到底要怎么来实现呢?诶,大家就会想到了,这个过程其实是涉及到了一个,呃,我在判断的时候不仅仅是跟我当前的数据有关,是不是还跟之前的数据有关啊,啊,那现在我要保存的这个就相当于是我的上一个数据,那个数据我得保存下来,保存成什么呢?当然就是一个状态,所以接下来我们就要用状态编程把这个需求来做一个实现,因为大家想象我们之前这个这个当前的这个需求,其实是可以用这个reduce这种方法直接做一个,呃,类似于直接做一个呃实现的,对吧?我把上一次的那个温度值直接保存下来,就保存成我的那个聚合状态,但是呢,Reduce这种方法有局限,哎,大家想你最后输出的数据类型必须是一样的呀,我们现在要的是什么呢?你要去做差值做做减了之后的这个状态判断,然后呢,最后输出的应该是一个报警信息,对吧,就如果说当前它这个差值大于十的话,我应该输出报警。
02:15
所以接下来我们就用自定义的一个reach function里边去自己定义状态,把这个功能做一个实现好,那接下来我们先把这个需求写出来啊需求呃,对于呃温度传感器传感器呃温度值跳变跳变超过十度。报警,哎,所以接下来就有点像一个真正的啊,就是类似于风控或者说监控类的一个需求了啊,那接下来我们就定义一个result stream,我们现在想要得到的一个其实是应该是一个报警信息对吧?呃,最后应该得到的是一个报警信息,我们这个就直接叫做呃,Boing,或者说叫做alert stream吧,输出一个报警,然后接下来这里边主要就是基于之前的data streamam,然后呢,当然是先要KB了,对吧,你肯定是同一个温度传感器跳变才报警吧,不同的温度传感器那温度值肯定不一样啊,所以接下来我们先要做一个key,诶大家看到现在我就直接因为是sens,呃,Sensor reading样衣类类型,我可以直接传一个函数来选择它当前的K,我可以用下划线对吧,或者我可以用那个,呃,直接用一个这个字符串,哎,这两种方式都可以,就是当前这个语义都很明确。
03:48
看就是把这个ID作为当前的这个分组的K,然后接下来做什么呢?诶,当然就是要定义一个具体的操作了,我们当前的这个操作好像也没什么特别的对吧?呃,就是当前你好像就是就是要输出一个报警嘛,做一个判断,做一个报警吧,所以这个其实你怎么定义都行,只要拿到当前的数据,然后呢,是一个瑞士function,然后再可以定义状态,对吧,然后去做判断输出,输出一个这个结果就可以了啊那这里面我们的这个输出结果,大家发现你要输出这个报警信息的话,相当于这个输出结果转变了,对吧?数据类型都改了,本来3READING你现在要输出的变成了string了,那呃,是不是大家觉得这个就只能是map之类的这种方法了,对吧?啊,那刚才我们已经给大家写了map了,我们写一个flat map啊,那我就调一个flat map flat map大家知道里边我要去自己自定义实现一个map方式,而且我们知道这个我单独定义出来啊,这个叫。
04:48
呃,Type Change对吧,温度值改变报警,而且甚至我还可以怎么样呢?把这个十作为一个参数构造,构造器里边的参数传进去,对吧?啊,就是你这个阈值我直接定义好,定义在这里,可以从外部传进去,然后接下来最后我们就是把这个result state。
05:11
这上面还报错,所以说我们直接写出来啊,做一个打印出,然后不是对吧?Alert做一个打打印输出,然后现在我们要做的当然就是实现这样一个自定义的flat function了,实现自定义注意这里边既然用到状态编程,那不仅仅是flat map function,还得是一个reach flat map function对吧?啊所以是你要自己去定义一个这个东西啊,我们把它写出来啊,这里边这个double我们写一个,这是叫做阈值对吧?那阈值那个单词叫做呃,Hold,把它写出来extend当前要实现的reach flat map方式啊,因为大家知道Fla map跟这个map其实呃没没太大的区别,就是map的这个操作行为用Fla map。
06:12
肯定可以实现对吧,因为Fla map相当于是还有一个打散了,做做做转换之后打散了,然后再再输出多个这样的一个一个方式的,所以我们可以简单的认为就是Fla map可以做的事情更多啊,Map的话就是一对一,Fla map可以打散之后变成一对多,对吧,输出多个,所以这里边我们用一个Fla map里面同样也是有输出,有输出对吧?啊ss reading,然后这个输出报警我们还是包装一下吧,因为这个输出报警的话,我们可能得知道当前的这个ID是什么,对吧?我们包装一个元组ID放在这儿,然后呢,呃,这个报警其实也不用写这个令类型报警信息了,大家想我怎么样就能知道当前这个温度跳变太大了呢?你把前后两次的那个温度值列在这儿不就知道了吗?对吧?所以接下来我的干脆两个double类型都列在这儿,一个三元组作为我的输出类型放在这就完事了。好,那接下来大家知道我必须要实现一个Fla map方法,对吧?那这个先不说。
07:12
大家注意啊,这个Fla map方法跟我们这里边的map就不一样了,Map大家看,其实就是你的输入数据类型在这儿,输出数据类型在这,那而Fla map呢,输出是空类型,那它用什么输出呢?哎,大家注意,这里边就是我们前面提到过的啊,这里边有一个collector collect收集器对吧?啊,后面我们其实经常会见到这种方式啊,很多弗link里边的算子,它其实最终的输出都是靠这个collect啊,这里边大家看,这就是输出的类型嘛,三元组,然后这个东西变量名叫做out,所以我们平常想要输出一个数据的时候呢,就调用out.collect对吧?而且这个好处在于什么呢?就是你每调一次out out,点点collect就可以把一个数据写入到这个输输出缓冲区里边,然后把它发送出去啊,就不像我们这里边,你就只能是最后有一个返回值把它返回对吧,你这个就比较局限,我们这里边就可以在代码里边你随时。
08:12
想要输出的时候al.collect就可以了,好,这是这个本身要实现这个核心的函数啊,呃,方法这个flat map,呃,这就是每一个数据来了之后都会调到这儿来,然后我们呢,在外边先要把状态定义出来,对吧?啊,因为我们这里边啊,首先lazy啊,Lazy定义一个当前的这个状态,我就叫做我要需要定义什么状态,就是保存上一次的温度值嘛,对吧?呃,定义状态保存上一次的温度值啊,所以这里边我们就定义这个叫做last temp state,然后它的类型是什么呢?当然就是一个value了,我就保存一个value state double,对吧,把那个温度存下来不就完事了吗?啊然后接下来我们在定义的时候,当然又是get runtime context get state,对吧,然后里边是new,一个value state script,里边这个名字我就叫。
09:12
叫做last time啊,然后接下来class of double,轻车熟路对吧?刚才我们都已经讲了怎么定义了,在外边把这个状态先定义出来,然后接下来呢?哎,接下来我们就是在这个Fla map具体操作里边,每来一条数据的时候,那大家想一想,诶,我这里边应该怎么样呢?是不是应该先要把当前的这个上一次的值先获取到啊对吧?我先定一个last temp,先把这个值先拿出来,从状态里面拿出来,用这种get方法对吧,把它拿出来。这样啊。获取上次的温度值,好拿出来之后,接下来我们要做的非常简单,就是求差值做比较对吧,跟呃,跟最新的温度值。
10:12
呃,求差值。做比较啊,那最后其实我们其实就是要判断一个当前这个差值diff啊,是否超过了我们传进来的这个阈值thhold啊,那这里面这个差值大家会想到我们没说这个只是说跳变,没说往大了变,往小了变,那是不是还要做一个绝对值啊,对吧?value.temperature当前的这个值减去我们的这个last temp,然后再点AABS对吧?做一个这个绝对值的求取,然后接下来哎,那这个就非常简单了,如果我们判断它大于传进来的thhold的话,那当前是不是就要输出,那输出怎么输出out.collect对吧?用这种方式输出,这里边注意本身里边给一个输出的数据作为参数,那输出的数据又是什么类型呢?三元组类型啊,所以接下来我们再来一个括号,三元组ID,那当前的ID是什么就是什么,对吧,来。
11:16
还ID,然后呢,哎,我们要把就是上一次的温度和当前的温度值两个double类型列在这里,那不就是这样吗?对吧,直接放在这儿输出就完事了啊,那这里边如果要是啊,大家看到啊,如果要是说没有的话,就是小鱼的话,那怎么办呢?我们不做报警,不输出不输出不输出就完了嘛,你看这就有这个好处对吧,想输输输出,想不输出不输出,因为它是unit嘛,你这里边如果要是map的话,还麻烦一点的,你这里面如果要不输出的话,是不是还得有一个空字符对吧?或者有一个特殊的一个默认定义啊,这个就比较麻烦,所以Fla map这个这种操作,al.cla的这种输出操作是更加方便,也更加的灵活的啊,那另外注意还得有一个操作,为什么呢?你现在只是跟上一次做了一个对比,那下一次又来一个温度怎么办呢?你还还跟我们之前保存的那个温度值去做对比吗?啊,所以大家会发现温度值有读取也得有写入,对吧。
12:16
你得不停的更新温度值才可以,所以这里边我们要做一个更新状态,对吧?来这里边做一个last time update,把当前的这个温度值更新进去,下一次比较的时候就按照这一次的温度值作为上上一个温度值来比较了,对吧?每一次都以上一个来作为比较,这样的话就可以实现这样的一个啊处理的一套机制啊好,那接下来我们来给大家把这个打印出来,来测一测,看看这个效果吧。我们这边启动要启动一个NC啊,快速的把这个起起来,接下来看一看效果到底怎么样。大家看到这里面报错了,这个没有影视转换对吧,上面那个把这个引入进来之后,忘记加这个下划线了。
13:08
哦,这里边又报了一个错,这个我们这个rich map其实是已经定义过的,对吧,我这里面加一个一吧,这其实我们只是做测试啊,这个对当前的代码并没有任何的影响,重新运行一下,大家看到现在这边我们的代码已经运行起来了啊,那现在的任务当然就是一条一条数据去输入了,对吧?啊,接下来我们在这个NC这边啊,一条一条数据输入,首先诶大家看这里边,诶我刚才执行错了是吧?执行的是哦,这里边执行的是这个这里啊,我当时改的是这里,其实我们没必要改这边,我们把当前的这个。测试啊,我们当前测试的这个。State这一部分稍微的修改一下就好了。重新运行一下。
14:01
好,现在终于正常起来了,我们来看一看当前的结果怎么样。呃,同样啊,我把这这条数据输入,呃,然后接下来大家看到,诶大家看到直接就输出了一条输一条报警信息,341就报警了,为什么呢?大家想透了这个,因为大家看它输出的这个last time是0.0,对吧?我们当时说过,你如果要是这里边,呃就是没有给出值的话,默认就是就是零值嘛,你当前是个value state,那当然double类型就是0.0,如果是引用类型,那默认就是空啊,所以这里边你看直接判断的话,我们这里边是不是相当于是有一个小bug对吧?你如果要是这里不给零值的话,呃,你可以给一个什么样的判断值呢?呃,大家就是想到我们在这个代码里边,按道理如果要解决这个bug的话,可以有一个什么样的调整呢?呃,那就是我可以再去判断一下,每次来数据的时候,判断一下当前是不是是不是第一个值对吧?哎,就是是不是我的那个默认值,如果是默认值的话,我这里边。
15:07
啊,就是给一个这个啊,当前这个就不要去做报警,我直接把它这个更新这个状态就完事了,那如果说已经不是第一个第一个数据了,我再去做对对比,所以这里边比较好的一种方式是什么呢?是给一个另外的一个状态,一个布尔类型的状态,表示当前是第一个数据,对吧?啊第一次来的时候他是false,呃呃,就是当第一次来的时候他是处,那我们这里边就直接给做一个,呃做就是直接去赋值就完事了,如果他是falses了之后,我们再去做这样的一个调整啊,那有同学说,那我这里边直接判断它的这个初始值是是零,然后去,呃,让他不做这个比较直接输出不就完了吗?啊这个不好不好做,对吧,因为你这里边温度值,它温度也有可能是零了,呃温度本来就是零,那你这个判断它是零的时候,呃直接就不做比较了,这个显然逻辑就不对了嘛啊当然有同学可以说我给那个付出值的时候,我给一个呃,就是它完全不可能取到的一个温度值,对吧,我给一个什么。
16:07
绝对零度负二百二百七十,73还是71对吧,给一个很小的一个温度值,或者说我给那个长整形的最小值啊,这个当然也是可以的,你就按照这个初始值做一个特殊的判断,那这样的话就可以避免这种初始值一上来做一个报警这种场景了啊啊,这个我们先把它先略过啊,这在我们的代码里面确实是没有处理这种场景,所以这样报错是很正常的,对吧?啊,然后这里边前面时间不重要,所以我们这里面就直接不管了啊,我给一个37啊,大家看这个没事儿对吧,没有任何的影响,我给一个49,哎,这个就会输出,因为超过十了,37 49,这是一个超过十的跳变,他报警了。啊,然后接下来49,我再给一个35.8,你看他又报警了,对吧,这是朝下跳变了绝对值,这个一变的话,又又这个变低一下,低的超过了十度,所以就直接输出报警,所以后边做操作,这个就完全符合我们的定义了,啊这个就没有任何问题了,大家可以好好把这个代码梳理梳理,看一看状态编程到底是怎么来用的。
我来说两句