00:00
我们来回顾一下之前做完的这个代码,我们是要实现一个检测,连续两次温度值差值超过一定阈值的时候就报警啊,那我们发现这个很容易,用一个状态就可以保存上一次的值,然后新来一个数据的时候呢?呃,我们说这里的flat map这个方法是每来一条数据都会调用到这里来,所以接下来我们就是,呃,每来一条数据跟当前的状态去做对比,如果超过了那个定义的阈值的话,那我们就输出报警,如果没有的话,哎,那就什么都不做,每次来一个新的数据,我都把它更新状态,对吧?啊,这就是我们已经实现的这一个代码,那在这个实现的过程当中,大家其实会发现,呃,这个代码其实运行起来是有一点问题的,那就是一开始我们没有给出值,所以这里边呢,会有一个初值是零对吧,0.0,然后接下来我们一上来之后,就是跟上一次去做对比嘛,并没有判断这个初。
01:00
值,所以我一上来之后,本来这个lasttimemp应该是零,然后一减,诶我发现那个30多度对吧,一减之后发现超过这个数了,直接就报警了啊,所以这种情况其实是需要去做一个排除的,呃,我们也说了,提供了一个思路,可以怎么样呢?啊,有同学说,哎,那这个很简单,我就在这儿,呃,直接去判断一下,如果要等于零的话,我不要报警就完了嘛。哎,我们说了这种方法不好,因为如果说我们这里边拿它是零这个值作为初始值来判断的话,那假如说我们温度真的出现零值的时候,你怎么办呢?哎,那就跟这个就搞混了嘛,所以这里边有两种方式大家可以去尝试实现,一种就是我给一个绝对不可能拿到的温度值作为初始值,对吧,比方说我们给一个绝对零度以下的这样的一个温度值,或者说我们给一个长整型的,呃,就是我们这里边是double类型对吧,我给一个double类型负的一个一个最小值啊,这个也都是可以的。
02:00
然后接下来我们进来这个代码也要改,对吧,就是上来之后我应该先要去判断一下是否当前的这个last tempmp等于我们的那个初始值,肯定取不到的那个值,那这个这样的话,如果它是第一次来,它当前是那个初始值,那我就什么都不要做,只把它更新就完事了,那如果说不是的话,我再去走这一套流程判断,对吧?啊,该输出输出,这是一种思路,另外还有一种思路呢,呃,就是这里边我不要去给他付初始值,因为在有一些场景下,大家可能会想到这个初初值它不是那么容易负的啊。另外一种方式就是我再去定义一个状态,一个布尔类型的状态,就是我们说的相当于判断它当前是不是第一次,第一次来的数据,对吧?啊,那那接下来就是如果说那个呃,判断是第一次的话,我就什么都不做,直接把这个状态更新就完事了啊,如果不是第一次的话,我再走这个流程啊,整体的思路是差不多的,那在这里呢。
03:00
啊,我是要给大家再去介绍另外一种实现思路,哎,这种方式应该不是说实现思路啊,思路还是一样的,我要给大家介绍的是另外一种实现方式,API的调用不一样,这种方式呢,不需要我们在外边去再定义这样的一个,呃,就是reach方式,自己是实现这个函数类,而是怎么样呢?直接在这里,我们之前这不是有一个这个直接点Fla map吗?哦,大家直接看啊,这里边你看这个flat map的时候,下边还有一个方法,不知道有没有引起大家注意,这个方法叫做flat map with state with state,也就是说这是一个有状态的flat map。诶,大家想一想,这个我们之前说map filter flat flat map这些操作,基本的简单转换操作啊,他们是不是都属于无状态的呀?啊,正常情况下都是来一个数,直接做转换就完了,你像Fla mapb,最多是把它打散,一对多输出就完了,本来是没有状态,现在我们想让这个操作有状态,那么要不就是走下边的这个流程,我用一个rich function自己去定义,要不有没有简单的写法呢?有的,这个就是我们单t streamam API里边给大家提供了一个现成的有状态的Fla map,这个名字就叫做Fla map with state啊,但是呢,这里边大家看到它的实现就只有一种实现方式,里边必须要传一个方式,这里边是一个呃,匿名函数,我们可以传一个匿名函数进来,对吧?那么这个函数到底怎么样去实现呢?它的参数大家要注意啊,这个稍微有点绕,参数是两个,一个是T。
04:47
T一个是S的option类型啊,这个T大家知道了,我们当前data STEM t啊,那就是本身当前的数据类型,输入的数据就是T,那大家知道这个输出的数据呢?诶在这儿呢,对吧,数据数据类型data stream r,哎,这不就是相当于输入是T,输出是R吗?因为map和Fla map都可以转换数据类型嘛,啊那这里边我们就看到了输入的T在这,那这个S又是什么呢?S就是我们要去保存的状态。
05:19
啊,所以这里边S只带的是状态的类型,这个就比我们那个一般的reduce定义更相对来讲就会更灵活一些了,大家看到这个状态可以不一样,类型可以不一样,对吧?不一定非得是T啊,你输出的状态跟态也不一样,它跟这个输入的状态也不一样,然后这就相当于呃,那这里边为什么会有一个option呢?诶大家知道这个option相当于是对于我们当前一个呃引用类型的一个包装,如果说当前这个引用类型是空的话,那么option一个空的这个值得到的是一个呃,大家知道是一个这个nu对吧,就是no这样的一个nu,这样的一个空类型,那如果说它里面本身有值的话,不为空的话,我们得到的就是一个sum这样的一个类型啊,这个我们应该是知道的啊,在scla语法里边,大家应该学习过类似的知识,那所以这里边其实就可以帮着我们用这个option啊,把这个状态包起来。
06:20
大家想想是不是就可以判断这个状态里边到底有没有值啊,如果没有值是不是就相当于是我们初始状态那个一开始的那个状态啊,所以用这种方法相当于可以把我们刚才提到的初值的那个bug直接就可以解决掉,有一个option,一个包装啊,然后我们再看一下它需要返回什么东西啊,返回的数据类型,大家看又是一个这个元组对吧?那这看起来是一个二元组,那返回是返回什么呢?AR,这不是我们的输出吗?哎,这是我们的输出数据类型,但这里边呢,把它又包装到了一个traverseible ones的一个数据类型里边,这个我们知道,就是呃,可以可遍利的这个数据类型嘛,就是比较上层的集合类型,对吧?所有的这个skyla集合类型里边,它的顶层都是orable,再往上的话都是传ver啊,所以这里面传ver ones其实就是只要是集合类型放在这儿都能用。
07:17
然后另外还有一个什么呢?还有一个option s,那为什么你这里面输入有options,输出还有options呢。因为你做完操作之后,不仅仅是只做了一个输出,是不是还应该有状态的改变呀?啊,所以这里面其实就是输入是什么状态,然后状态要经过这一次操作之后,状态改成什么样,还得有一个输出放在这儿。这就是我们当前里边要传的这个参数啊,这个看起来有点有点麻烦,但是我们实现一下吧,啊,实现一下大家可能就会觉得比较清晰了啊,那这里边我可以定义一个,呃,这里面大家知道我我直接要实现的是一个函数嘛。
08:03
好,那我们直接在这里给大家把这个函数来写一下啊呃,这个函数大家会发现其实这个参数我应该要做一个判断的啊,为什么呢?因为你那边有一个option类型啊,所以一开始我就可以判断这个option它到底是none还是一个sum,哎,所以这样的话,我是不是可以直接用scale里边的这种函数,就是模式匹配的方式去做一个参数的一个一个对应的选取啊,对吧?这里边我们直接写成这种模式啊,那case判断一下当前的这个输入参数啊呃,首先我们这里边的这个,呃,应该有一个当前input的这个data,对吧,它本身应该是一个censor reading,这是我们的input进来的这个数据,当前的数据,然后呢,另外还有一个状态数据,那假如说这个状态数据是空的话,那应该是一个nu类型,我们要做什么操作呢?哎,这里边返回的是,注意返回的是一个二元组。
09:04
对吧,返回到二元组里边,前边是我们的输出数据类型,包装成一个啊集合类型对吧,成ones,那这里边集合类型我干脆就直接用一个list来表示吧,那list的你既然是一个当前是空嘛,我应该没有输出,那我这个list怎么办?哎,那很简单,空的嘛,Empty完事。另外后边当前的状态要改,你现在来了一个数据了,那我们当前状态你不是要存上一次的温度值吗?那是不是应该把当前数据里边的温度值提出来,放进更新当前的状态,所以接下来,诶,而且这里我们要注意啊,更新之后还是一个option类型,诶,那这里边既然有值了,我应该写成一个some。然后里边的这个值,那就是当前data.temperature对吧?啊,这就是我们对于这个初始值做的一个操作,初始状态做的一个操作,如果是空的时候直接执行这个啊,然后接下来还有另外一种情况,就是如果已经有了值的话,同样还是又来了一个数据sensor reading,那现在呢?哎,这就不是这个nu了,对吧?呃,现在是一个sum类型了,我得把这个参数也定义出来,这个参数当然就是last temp了啊,这个我们一看就知道就是它接下来它是一个sum double对吧?这是我们保存的这个数据类型啊,然后接下来它的这个执行的过程呢,我还是用一个就是函数题给大家写出来啊,那执行的过程大家想是不是就跟我们这里边的处理逻辑一模一样了,对吧,你接下来还是判断当前的这个值是什么,然后呢?呃,我们获取这个值,然后之后更新状态就不用了,因为它已经包。
10:48
包裹在那个,呃,你输出的时候要输出什么,直接把它放在这儿了嘛,所以接下来我可以直接把这个逻辑copy过来,但这里大家要注意,就当前的这个值,那应该是什么呢?其实就不用再去点value了,对吧,我直接用这个some,这本来是一个option类型吧,我直接点get,把它get出来是不是就完事了啊,所以这个其实就不需要了啊,这里边我们做对比的时候,直接用last time.get把它拿到,然后诶,这里边不是value,当前我们是贝塔点temperature,然后它俩一减得到它俩的啊,绝对值,差值的绝对值,然后这里边没有磁,然是后的,那没办法,只有直接写死了,我们给一个十十点零,然后接下来就不是out点了,我们现在是要输出的是一个二元组,对吧,而这个二元组呢,还稍微有点特殊,呃,就是这里边我输出的这个二元组,前面是一个list,一个集合类。
11:48
类型后边是一个option类型,一个sub对吧?啊,那后边这个状态比较简单,我们先写这个状态吧,状态就还是现在的data塔点temperature,先把它包好,写进去就完事了。那前面这个例子怎么办呢?哎,我们现在的例子大家想这里边是不是就是要把你对应的那个输出数据,我们现在要的那个输出数据是什么啊,是个三元组对吧?把这个三元组包装好再输出就完事了啊,就是这里边如果你超过了这个限度的话,那我们这里边应该是有包好的一个三元组,那前边应该就是当前的ID啊,那应该是data塔点ID,后边就是连续两次的温度值,呃,一个是last time.get另外一个就是当前data.temperature对吧?啊,那这里边还在报错,为什么在报错呢?因为你只处理了一个分支对吧,你这里边还有另外一个分支没搞定呢。
12:48
那如果else怎么办呢?你这个就不像我们那个alt.collect对吧,这里边它返回是空嘛,这个你你随便不就是,如果是else的话,不处理就完了,这里面你还必须得给一个返回值,还得是这样的一个数据类型对吧?二元组,那前面这个list怎么办?不是这个Java的list啊,就是这个SC的list对吧?啊,那那list肯定就是一个空的了,我们当前如果要是差值没有超过十的话,没有输出,所以所以输出是list ten empty,然后另外呢,当前的状态还是要改成最新的data.temperature对吧?这样一写的话,这里边就不再报错了,就完整的实现了这样的一个过程。
13:36
好,我们接下来把这个代码也给大家测一下。大家看到这里边我们直接报错了啊,这个报错其实说的是就是当前如果我们要定义这样的一个Fla map with with state的话,你必须调用的时候,你看这里边我们的调用过程啊,它这里面是有泛型的是吧。就是你必须得把当前的输出R和这个S状态的类型直接显示的定义在外边,因为要不然的话,你你里边这只是一个函数体对吧?要不然的话这个弗link怎么能知道接下来我要处理的这个当前的这个状态类型是什么呢?呃,一开始我再去创建这个任务的时候,我就得把这个先定义好,所以这里边后边是有这个泛型的啊啊,我把这个定义出来,那首先输出的数据类型还是这个三元组了啊,String double啊,另外还有一个类型需要传入当前的,呃,这个S的类型状态的类型,注意这里边就是S没有option了,那所以这里面我们就直接给当前的状态类型不就是double吗?对吧,直接把它列出来就可以了,好,然后我们接下来来运行一下。
14:52
哦,刚才那个加小括号应该应该也没问题对吧。
15:02
好,呃,这个我们已经提起来了,接下来我们在代码里边来做一下测试,接下来还是我们之前的这个数据啊,一行一行输入,大家记得之前我们直接输入第一第一条数据的时候,直接他就这个报警了,对吧,因为默认的初始值是零嘛,然后现在我们给一条35.8没有报警,对吧,没有任何的这个数据输出,然后接下来我还是同样的啊,在这里边给一个数啊,给一个32也不会报警,没有问题,然后接下来我给一个大一点的43。现在报警对吧?啊,就是超过这个差值超过超过十了之后,现在才会报警啊,所以这个是完全符合我们定义的这样的一个状态了啊,所以呃,大家看到就是用这种方式啊,就是直接用跟stream API里边已经给我们提供的带状态的flat map这个操作也是可以实现这个需求的啊,这至于说哪种方式更好,大家更习惯,这个就看大家这个调用的过程了啊,然后这个需要给大家稍微说一下,就是这个flat map with state啊,大家注意啊,它必须是在kid stream里边才能调用啊,另外同样你看到还有什么,之前我们不是说map filter flat map这三个是基本的转换操作,没有状态吗?诶现在与之对应的就有filter with state map with state flat map with state都可以用这种方式定义一个状态,然后去做有状态的操作啊,所以回过头来大家就知道。
16:38
吧啊,为什么我们说flink是一个有状态的流式计算对吧?啊,就即使是这种无状态的任务,我也可以给它单独定义状态啊,这个就是另外的一种做法,那如果说你想要用这种方式的话,必须在k stream里边去调用。其实大家能想到,为什么非得这么去调呢?你在data stream里边如果要去看这个的话,我们这里边不是有map,有Fla map吗?那有这个we state吗?没有对吧?大家看只有map和flat map,并没有这个with state的版本,就是因为这里边我们的这个状态其实应该是什么呀?就是一个kid street kid state监控状态,所以说这里边你必须经过KBY之后才能够用这个状态,你直接基于data stream就没法做了。
17:26
这个是大家需要去注意的一点。
我来说两句