00:00
上节课给大家已经讲过了状态管理的一些理论方面的内容,那接下来呢,再给大家把这个状态编程在代码里边详细的再做一个实现啊,那我们还是继续用这个之前process function test的这一部分代码啊,在这个基础上继续啊,来做一些其他的一些转换,那现在我们在写一个什么样的,呃,状态编程的一个需求呢?实现一个什么样的需求呢?呃,比方说这样啊,我们的现在这个需求,上一次我们这个状态编程主要是检测什么呢?检测一段时间内温度连续上升,对吧?那现在我们来一个简单一点的需求,什么需求呢?检测两次温度变化,如果超过一定范围的话,超过一定的限度的话,我就要报警。就是说传感器传回来的温度不能跳变太大,呃,这个是不是也是比较常见的一种需求啊,啊,你不能就是正常这个温度平稳的上升或者平稳的下降,我觉得还可以接受,但是你如果要是突然一下子跳变十度,那我觉得就不可接受了,对不对啊,就是可能是出现了什么故障,所以我要检测这种异常情况,那大家想一想,这个需求我怎么来实现呢?
01:22
啊,首先我们先把这个主程序先写出来吧,那前边应该还是类似对吧,那前面还是这个,呃,创建环境,我还用这个even time作为当前的时间语义,我的流从这个soet text里边去读取这个文本流,接下来把它转成s reading,指定时间戳生成这个water mark。然后接下来我可以在。单独定义一个,比方说processed stream2还是低于基于data stream,对吧?那大家想一想,我现在如果要是检测一个传感器,温度要差值不能超过一定的范围,不能超过一定的限度,这个是不是也得做陪啊,啊,也是在检测某一个传感器对吧?哎,所以还是k by ID对吧?然后那大家想到我是不是还是直接可以直接一个点process搞定啊,对吧?因为我们说这个process是最底层的API,它相当于是大招,所有的需求都能用它搞定,这里面我定义一个叫做呃,Temperature,前面是那个上升报警啊,我这个叫做改变吧,就叫change报警,对吧,因为这个就没准儿了,不一定是上升还是下降,Change alert。
02:46
定义一个这样自定义的一个process function,接下来是不是得把这个做一个实现好,我们在下边做一个实现。Temp change alert,那当前是不是我们还是要去怎么样对kid process function,那同样这里是不是还要传入KIO,那当前的key是不是还是string啊,那现在的I input sensor reading对吧?呃,那我我们这里边的这个输出应该是什么呢?诶,这里边输出我们定义一下,我们要输出什么呢?可能要输出的是我,我就不说输出一个报警信息了,我就直接比方说输出什么输出。
03:37
这一次的温度是什么,上一次的温度是什么,是不是就可以看清楚到底这个这个温度跳变是一个什么样子了,对吧?啊,所以这里边我这样啊啊,那大家会想到是不是还得有这个ID啊。对吧,输出是不是也得有哪一个传感器,这个温度跳变了,所以ID是一个string,那另外对两两次的温度是不是都是double啊对,所以把这个定义出来,另外大家会想到这里边我其实还可以传参数,传什么参数呢?
04:12
对,本身我要定义的那一个超过多少之后就报警,这个是不是可以是呃,对不应该写死在里边对吧,应该是可以在外面去传的一个参数,比方说我要求这个十度,超过十度就报警,可不可以,哎,那所以我可以直接传一个十十点零,因为这个温度我们是double类型嘛,对吧,传一个10.0。然后这里边我就需要给一个,呃,比方说这个我叫它的阈值啊,阈值叫stress hold,对吧,这是一个double类型啊,把这个实现之后,前面就不报警了,对不对啊,所以接下来我们其实是要实现它里边的process element,好,整个的这个主体流程已经写好了,接下来大家想一想这个需要要什么样的东西呢?
05:12
上一次的温度,只要拿到上一次的温度是不是就可以啊,对吧,所以这里边我是不是需要得定义一个状态啊,对,所以定义。定义一个状态变量。呃,保存上次的温度值。操。呃,那所以这里边大家会想到我是不是可以直接lazy玩对吧。Last temp state,我可以定义一个这样的一个东西,这个它的类型应该是什么样的呢?对单一的值value state,那里边的具体类型double,那这接下来是不是要用上下文运行是上下文对吧?然后是不是get state啊,啊大家看这里边就是你不同的状态类型,是不是要用不同的这个get啊,获取这个状态句柄啊,如果是value state的话,大家看是不是就是get state,那你假如是list state的话,那应该什么?
06:30
大家看是不是就可以get list state啊,啊,在这里就可以有区别啊,所以这里边我直接get state,这里边要去new一个value state script,对吧?当然这里边需要去定义名字last time。后边给上它的类型定义double对吧?好,这就是我们定义好要保存的上次拿出来的那个那个值对不对?好,那接下来在这个具体处理的过程当中,Process element里边我们应该干什么事情呢?对,我们应该先拿到获取上次。
07:16
的温度值。我们定义一个last temp啊对,那个有了state,所以我们直接叫last time就可以了,这个怎么获取state,直接点value就拿出来了,对吧。接下来怎么办呢,对吧,上用。当前的温度值和上次的是不是要做一个计算,计算一个差值啊,对吧,求差。
08:02
如果大于阈值,那么就报警对不对,输出报警信息啊,这就是我们要做的事情,其实非常简单,首先先定义一个差值,呃,那这个东西是不是要用当前的value.temperature要减去上次的last time,诶,那咱也想到你这个没准啊,有正有负对不对?那你到时候如果这个是负的话,那那它不是永远都会小于那个阈值吗?啊,那所以这里边是不是要做一个绝对值啊,点ABS对吧?然后接下来如果对,如果这个diff如果要大于我们定义好的那个rehold的话,那么就输出报警信息,我们的报警信息可以用out.CLA直接来输出,对不对?这里边我们要要的是一个三元组,那idd是谁呀?是不是就是Y6.id啊对吧?他俩ID都一样吧,P之后的,然后上一次,呃,这个就你输出,先输出这次的也可以,上一次的也可以,对吧,我上一次的温度和当前的。
09:25
Temperature温度,这样是不是就正常输出了啊,那大家不要忘记这个,如果做完一次处理之后,是不是应该把这个状态应该要更新啊,对吧?呃,最近一次的那个温度就更新掉了,Last time state要update成为。value.temperature对吧?所以大家看其实其实就是这样,是不是非常简单啊?呃,如果这里边已经处理完之后,呃,当然这里边我可以直接把这个STREAM2直接输出对吧?啊,这里这里边大家如果一运行的话,就可以看到这样的一个过程了,呃,这里我要给大家讲的是,大家看我们刚才是用这个process方式做了一个处理,这里边可以用这个状态编程是吧?那大家可能就会有疑问,只有process方式里边能用状态吗?
10:22
其实不是。其实不是,比如说这里边大家看我们这里面其实做的这个这个操作是不是非常简单啊,那我其实就想到我们说process方式是一个大招嘛,对吧,啥都能用,啥都能干,我们说里边它能够获取到它的上下文,能够获取到我们事件和时间,对吧,基本的所有的那个底层信息都能够拿到,还能注册定时器,那这里边我们也没用到定时器啊,是不是好像稍微有点浪费啊。对吧?哎,那所以大家会想到比方说这里边,我我觉得这其实就是一个简单的转换而已嘛,对吧,相当于是做了一个简单的判断,然后把它这个转换输出一下,那大家就会想到,那我一个转换是不是直接一个map或者flat map就能搞定了。
11:15
诶,所以这里边我们试一试另外一种实现方式啊,我把它注掉。我直接用flat map能搞定吗?诶大家看到这里边可以Fla map还可以Fla map with state对吧?哎,所以这里边我先给大家用flat map做一下啊,那Fla map是不是也得自己去new一个,哎,相当于是这个map,呃,那个flat map方式对不对?所以这里边我还是定义啊,Temp change alert。10.0对吧,先把它定义好,然后这里我们先把它定义成这个二吧。然后接下来我们真正的去。
12:04
在这里边再把它定义出来。Increase诶这个不应该是increase是吧。我们前面打错了,应该是change是吧,我不该改这个改错了啊。在下面呢。把这个改了。好,我们把它定义出来,哎,这里面有一个double对吧,还是把它改成hold。呃,然后我们这里定义出来之后,上面应该,哎,这里边还还有问题对吧?那大家会看到这里边我们应该给它要实现一个什么东西呢?这里边是不是得实现一个flat map function啊,对吧?但是大家会想到当前我们是不是要状态不要状态,我们说过在flink里边我们可以实现有状态的编程,也可以实现无状态的流失处理,对吧?那map Fla map这样的算子本身应该是有状态的还是无状态的?哎,本身应该是无状态的,那所以大家就会想到,那我这个是不是就不能这么操作了呀?对,大家会想到如果说我想要有状态的话,怎么样就可以有状态了?对,当时讲过负函数版本的,哎,那种函数类是不是就都可以去操作状态啊,啊,而且还有那个生命周期对不对,哎,所以这里边我们可以把它直接来一个rich。
13:40
版本reach flat map方式,诶大家看到这里是不是它也需要传入两个参数,两个类型参数,对吧?一个是in,一个是out,这里边in是sensor reading out,我们还是写成这样,String double double。
14:03
好,这个写完之后,大家检查一下,前面不报错了对不对,哎,这个看起来是正常了,好,那接下来大家看是不是我们要做的操作,就是实现它里边的flat map这个函数啊,哎,大家想一想,在这个里边我们要做什么操作?那是不是正常来看的话,还是你定义一个这样的状态,我们把上次的那个温度值拿出来,然后是不是跟当前的那个温度值去求差值啊,然后如果大于阈值的话,直接输出啊,然后更新这个状态,是不是过程一模一样啊,所以我甚至可以直接把这个抄过来就好了。诶,大家看,那这里边是不是就缺了这样大家看这个value还是一样的value对吧,这个out是不是还是一样的out啊,这些都不用改,主要要改的是什么,是不是就是我把这个last time state得定义出来就可以了啊,当然这里边其实我直接在外边去用这种lazy定义的方式也可以,或者我还可以用另外一种定义方式,大家还记得本身是不是这个reach方式里边有生命周期啊,有生命周期的话,对,我是不是可以在open哦,它这个放在下面了啊,我们把它放在上面去。
15:26
放在上面看着舒服一点。在这个open的时候,是不是就可以直接把我们这一个状态声明出来啊,哎,但是这里大家要注意,我是不是应该在外面先去定义啊,对吧,先定义这一个private。啊,这就得定义成一个bar了,对吧,这是可以改变的一个temple state。它的类型是一个value state里边是double,那当然这里边是不是应该是先赋一个空值啊,然后啊,就是这里边还不能从上下文里边拿到东西,对吧?哎,这个我们什么时候才能从上下文里边拿到东西呢?是不open的时候啊,对吧,在生命周期里边才能拿到,所以这里边初始化的时候生命state。
16:27
变量好,所以这里边我们给这个last time state去做一个赋值,那是不是也是一样啊,大家看get wrong time cons对吧,Get state啊,这里边是不是就完全一样了呀,你有一个state value state script,然后里边last。然后里边class of double,对,完全一样对吧,所以大家看其实是不是同样的一个一个事情,假如说用不到更加复杂的一些东西,定时器呀,对吧,操作这个时间呀,Water mark呀,是不是其实也不需要用到process function这样的一个大招啊,我直接这里边定义一个Fla map function是不是也可以搞定,诶直接把它在这里搞定就可以了,好,所以这里我可以把它这样写完之后,这里边有一个process stream2,对吧?好,那我们来测试一下吧,看看这个效果怎么样。
17:39
哦,这个应该已经提起来了,所以我们直接把它运行输入数据应该就可以对吧。好,我们提起来了之后,在这个里边输入数据去做一下测试。
18:02
好,大家看到这里边他直接为什么输出了一个,诶,这这里边我没有改他的那个名称是吧。我们在这里边输出的哦,这个叫做processed data对吧?啊,这里边是这样,然后我们的那个这里边应该就是直接是报警信息了,对不对。所以大家看这里边它直接报警了,为什么会直接报警呢?出一条直接报警了,哎,大家一看其实已经知道了,是不是,因为初始值是零啊,初始值是零,那你来了一个35.8,当然直接就报警了,对不对?哎,所以这个其实是正常的啊,当然大家如果想避免这个bug的话,也可以改一些其他的东西啊,那比方说我再给一个32。那大家看现在是不是就只有input data没有输出的这个报警信息啊,呃,那如果这里边,呃,大家会想到我如果这里边给一个25的话,它会报警吗?25:35.8不是已经小了十以下了吗?诶,哎,我们是不是只保存最近一次的那个温度啊,只考虑最近一次对吧,并不考虑很遥远的那些数据的变化,所以说哎,这里边25输入并不会报警,那如果我再输一个,哎,我如果再输一个35.2会不会报警,这这个时候就报警了,因为25跟35.2显然是大于十了,对吧?啊,当然就是说我们看看如果要输一个很小的数是不是也会报警啊,所以这个行为是完全符合我们的预期的。
19:41
好,这是这一部分。
我来说两句