00:00
下来我们来用状态编程实现一个基本的需求啊,大家看一下这个需求的描述,呃,接下来我们是要检测传感器的温度值,如果说同一个传感器它的那个温度值呢,连续两次的温度值发生了跳变,超过了一定界限的话,我们要输出一个报警信息。比方说这个界限就是按这里要求的,就是十度。那大家想这个需求我们怎么样来实现?按照我们之前讲过的那些,呃,所有的transform的API,或者说其他的一些用法窗口,包括窗口啊,大家想到能实现这个功能吗?怎么样实现这个功能呢?就我们当时呃,就是给大家讲到,呃,做这个操作的时候,其实是要。诶,大家会想到这个要输出报警,这是不是相当于我要把这个当前的数据做一个检测,然后转换成一个报警信息,这相当于就是一个map对吧,或者Fla map类似于这样的一个操作,对不对?呃,大家直观想的话,可能他用这个,大家说是用map还是用Fla map这个操作。
01:19
转换成一个输出输出的报警信息,诶直观感觉的话就应该是输入数据sensor reading,然后转换成一个string类型,对吧?或者输出一个,呃,我们看情况啊,想怎么定义,比方说我定义这个当前的三色ID,然后呢,呃,你不是两次温度超过超呃温度值超过十度吗?那我就把两次的温度值列出来,对吧?我直接输出这样一个三元组当做一个报警信息也是可以的,哎,那自然我们就想到那一个map就应该搞定了吗?诶,那那这里边大家想这个用mapb搞定的过程当中还需要什么东西吗?你来一个数据直接就能判断出来是否这个跟跟之前的那个温度值比较,是超过十度了吗?哎,所以你要比较它俩的温度差值,是不是必须需要有上一个温度值和现在的这个温度值啊,现在的温度值是当前数据,那上一个温度值怎么办?
02:15
是不是要保存成一个状态啊,啊,所以这里面其实我们就是很简单的就想到了这样一个实现啊,那然后另外还有一个问题是在于这就是前面我们说的要用map还是flat map。就是如果用map的话,它是一定是一对一对不对,它的输出类型是不是,如果我们定了是一个三元组的话,那就必须是三元组,对吧,必须有这样的一个输出啊,那大家就会想到如果说你输出none,当然从这个代码上来讲也是可以可以编译通过的,但是后面执行你这输出了一个nu,后面我们要把那个nu打印是不是就有问题了啊,所以大家会想到在这里边如果要是不输出报警的话,我们是不是就干脆就不要转换,不要输出就完事了。
03:00
啊,那有同学说,那这个看起来又像一个这个filter呀,但是filter能能转换数据类型吗?不能转换对吧?所以我们是不是用flat map比较合适,因为Fla map我们用那个al.collect相当于就是所有的数据类型可以转换,而且我可以控制是一对多还是一对一,还是不输出,是不是都可以啊,哎,所以这个就比较灵活,接下来我们的实现大家看这个实现就是分组之后做一个Fla map。啊,接下来我们实现一个这样的需求啊,就是温度跳变报警的一个需求。那同样还是在这个state下边去new一个class,当前这个是state test3,然后接下来这个我们是一个呃,这个application。Case对吧,应用的一个事例啊,啊就是state,这个应该是k state的一个应用事例,对不对,因为我们是做过了分组之后,然后再去做这个操作的,那显然要用到的是一个k state啊,好把这个创建出来。
04:09
接下来前边的这个操作流程,那应该跟这个都是之前我们的做法都是一样的是吧?啊先把这个呃数据啊环境创建出来,然后呃文本文本流读进来,转换成呃sensor reading啊这样的一个po类型,接下来呢,做对应的这个处理和打印输出,我直接把这个先copy过来。其实中间主要就是这一步转换操作啊,这里面可能会有一些问题对吧,所以这里面我们其实要准备定义一个,定义一个flat map操作检测温度跳变,输出报警,所以从这个代码实现上来讲的话,直接就是。啊,Data stream先做一个KBY对吧?这个还是按照ID要做一个分组,那接下来就是直接flat map里边是不是传一个自定义的Fla map function就可以了啊,你有一个我把这个叫做temp change warning对吧?呃,这样的一个测试啊,然后我们要给一个对应报警的一个温度值作为一个阈值对吧,超过这个限度我们就报警,我们给的是十度嘛,那这个值是不是相当于可以作为一个参数传入啊,呃,这里边我们直接构造方法的一个参数啊,温度值是一个double类型,所以10.0。
05:37
最终得到的这个就是一个result stream啊,或者这个可以叫做一个warning,对吧,反正就是最后我们要报警了,得到这样一个结果,呃,所以接下来我们具体要实现的就是这个自定义的函数类了。好,那么我们实现自定义。呃,函数类public static class。
06:06
哦,接下来大家可能想到我要去实现的是一个flat map function,但是注意接下来是不是我们要用到状态啊,嗯,对,大家想到本来Fla function里边有状态吗?啊,当然你也可以定义算子状态对吧?但是算子状态的话用在这儿是不是不合适啊,我们想要的是针对每个K来做状态,那k set必须要用到那个运行上下文,那是不是必须得是reach map function,对,Reach map function对吧?这里边当然大家看到里边也是,呃,不是map function啊,是flat map function对吧?同样泛型有两个input和output的输入输出类型,当前的input是是不是就是sensor reading啊,哎,那输出的这个样例类,呃,输出的这个类型当然就是我们自己定义的一个类型了,比方说我们想要的那个报警信息,就是就是就是当前的那个34ID,然后因为两次跳变吧,那大家想我把第一次的那个温度值和第二次的温度值直接列在这儿,是不是一目了然啊啊,所以就是一个三元组对吧?呃,把这个写出来。
07:14
TOP3,然后里边呢,当然就是string double double对吧,好把这个做一个。诶,我们这个需要再引入一下啊,引入一下Java的元组类型啊,当然前面这个写错了,不能是implementment,而是要对,既然是负函数嘛,所以要继承啊extend,然后接下来里面大家看到上面这个这个类型还报错呢,因为这里面是不是没有实现它的构造方法呀,对吧?所以这里面我们首先来一个私有属性,就是当前啊温度跳。变的跳变的那个,呃,差值或者叫阈值对吧。
08:04
就超过这个范围的话,我们就要报警了啊,所以这里边我定义一个private double,呃,这个阈值的话,我就叫threhold对吧。好,把这个定义出来。然后呃,接下来我就直接把这个constructor创建出来就完事了,对吧,直接把这个一传,哎,那接下来。哦,大家看到当前是不是我还必须得把这个定义出来啊,当前这个应该得到的类型应该是什么?是不是三元组啊,所以接下来是string double double,对吧?得把这个定义出来,那这样的话上面类型就完全没没问题了,那下边这里还在报错,那是因为对,必须得实现一个Fla map方法,对吧?那么在实现这个Fla map方法之前,我是不是还得再定义状态呀,对吧?啊,所以接下来我们是定义状态,我们的状态保存什么呢?是不是直接保存一个value set,就保存上一次的那个温度值就可以了,对吧?每一次只保存上一个温度值,来了新的之后跟上一个做一个减法,对比它那个差值完事儿啊,所以接下来保存上一次的温度值,好,那这个就是自定义了啊,Private一个value state里边的存的值。是不是只要这个double就。
09:25
可以了,然后我把这个定义成叫上一次的啊,Last temp state,呃,那如果说我们想要获取这个状态句柄的话,是不是还得在open生命周期里边去用那个get状态context上下文里边去获取啊?所以接下来last time step,大家还记得这个过程吧,Get runtime contacts,然后get state里边要new一个对value state script描述器,那描述器里边传的是名称和类型,对吧?哎,当前这个叫last temp。
10:04
那接下来我们这个是double类型的class,然后这里面可能还会涉及到一个,就是我这里面有初值吗。大家可能会想到,如果要是没有初值的话,那后边就比较麻烦了,对吧?呃,后边要做判断的时候,你就得去,呃,就是当前这个啊,到底到底是不是now,可能得得去做对应的一个一个判断啊,那我们先把这个先放在这里啊。先把它写。写出来,接下来我们就是Fla map里边是不是要获取上次的温度值去进行比较啊,对吧,首先获取状态,也就是上一次的温度值,我把它单独的用一个变量保存起来啊,Double类型的一个last time last time state,然后点value拿到对吧,然后接下来。
11:01
啊,大家大家其实会想到是不是就是我要不要去直接拿这个value去做一个减法,是不是还得判断当前这个是不是能啊对吧,如果是none的话,那就那就不要做判断了,对吧?啊所以接下来是如果不为nu,就是状态不为nu。状态不为呢啊,那么就判断两次温度差值。啊,所以接下来其实非常简单的一个操作,就是我直接判断last time,呃,不等于对吧,如果等于的话,是不是就啥都不用干了对吧?呃,不等于none的话,接下来我们要去做一个判断。就是我是不是先得做一个计算,算出它的那个差值diff对吧,用的是当前,呃,这个value.get temperature减去last time啊,那大家想一下,我当前好像没说是一定要。
12:12
比之前的那个大超过十度对吧?哎,所以大家想到小也是有可能的呀,那用绝对值对不对,所以这里边还得做一个这个绝对值的计算啊,调ma的这个ABS这个方法,把这个绝对值算出来。哎,这就是我们的差值嘛,啊,那接下来怎么样,是不是判断这个差值啊,对if这个dif,如果要是大于,比方说大于等于stresshold的话,我就去做一个报警输出,输出怎么办?怎么输出。对,out.collect对吧?这Fla map就有这个好,我是不是不一不需要一定输出一个结果啊,对吧?这里边就直接out.collect就可以啊,这里边要new一个三元组,里边是当前的ID,那是不是Y6.getid啊,上一次的温度值那是last temp,这一次的温度值value.get temperature是不是就是这样啊?好,然后大家注意,这还没完。
13:13
哎,这只是说如果不为弄的话,就判断,如果要是满足这个条件的话,就直接输出,那输出完了之后呢。大家想想,输出完了之后,是不是相当于我当前的这个,呃,所所有的这个判断都已经结束了,是不是接下来如果要是它继续有这个温度跳变,我还要再输出啊,还要继续报警对不对,那是不是状态还得更新啊?另外还有一个问题是,前面你只判断了不为囊的时候,那如果是浪呢?适当你如果什么都不干,那接下来是不是这个状态还是那啊,所以是不是我至少要把对当前的温度值是不是要保存到last time state里去啊啊所以这里边有一步操作就是更新状态对吧,就是把当前的温度值保存到上一次的这个温度值里边去,Update当前value.temperature。
14:10
所以这就是我们这里的一个完整的流程,对吧?啊,那当然如果大家想到这里是流处理,那相当于这个是状态一直不清,对不对,里边时时刻刻总是保持着这样一个状态啊,那大家想到那什么时候清呢?是不是这个应该等到close的时候做一个清理工作啊,我们定义的这个last temp state,做一个clear啊,这样就做完了,这就是一个完整的处理流程。好,那接下来我们来测试一下啊,看看这个效果怎么样啊,我把这个NC还是提起来。运行一下。先把这个起起来,接下来我们要去做一个测试输入啊,这个大家知道,当前这个的话跟我的时间戳没关系了,对吧?我都没有关心那个时间语义嘛,只是按照个数前一个这一个对吧,去做这个判断啊,所以接下来比方说我来一个36.3啊,这当然没有任何的输出了,这第一个嘛,然后接下来比方说啊,37.9,这当然没没反应了,对吧?啊,假如说我来一个48,大家看是不是输出了一个报警信息啊,好啊,那当然我还可以输出别的数据对吧?比方说我来一个三四十六的,这个一下子变成15.4了,他会报警吗?这是346的对吧,那跟341是不是跟他跟一点关系都没有啊,所以如果我要是想让341,比方说啊,我346如果要报警的话,那是不是我应该直接跳变,比方说给一个呃,这个三三十五对吧,大家看是不是346也就报警了。
15:51
啊,那如果341还要继续报警的话。对,大家想比方说啊,我这里边在小于这个的话,我给一个36可以吗?小也可以对吧?诶大家看它上一个是不是还是48啊,它的状态没有被这个346影响对不对,各自处理各自的,所以这就是我们当前这个按照不同的34ID配置一个状态啊k set,然后去检测它连续两次温度值差值超过一定阈值的时候报警啊,这就实现了这样一个功能。
16:24
这是状态编程的一个例子,大家下来之后可以练习一下。
我来说两句