00:00
我们刚刚给大家讲的就是process function,接下来就实现一个具体的需求,比方说我们在sensor啊,这个传感器的这个场景里边,大家可能会想到,其实很多情况下我们是要检测一些呃温度的,这些特殊的情况是要去做报警提示的,对吧?啊,那比方说我们这里边想一个场景,就是说假如这个温度在一秒钟或者两秒钟之内连续上升,那我这个时候就觉得有可能出现异常了,我就要做一个报警,比方说现在我们就实现这样一个需求,我先把这个先创建出来啊,我们还是放在这个API下面。一边写,大家就一边想自己应该怎么实现这个这个需求呢?我要检测两秒钟之内连续温度上升process function。Test。
01:00
大家应该知道还是一样的这个操作流程对吧,我就直接抄了,先定义这个环境对吧,然后后边是做这个data stream的数据源引入,然后做转换操作,最后输出,我就直接copy过来了啊啊,我先完整的先copy过来,然后然后我们删吧。这些看看是不是都要啊,都要。注意一下,把这个要改一下啊。好,然后这里面其实有一些我们就直接删掉好了,像这个不要了啊呃,从文件读我们也也不要了吧,直接我们就拿socket测就好了,Data stream。好,那前面的这个统计最小温度我们就不管了,直接把它删掉。好,大家看我这里边,其实这个都已经实现了,对不对,大家会想到我现在一秒钟之内温度连续上升,你开个窗口或者说用个什么东西好像都不太好使,对不对,那所以这些东西,那大家如果想到什么之前讲过的API都不好使的时候,Process function就是大招。
02:14
就是它就是最底层的API,就是什么都能拿得到,什么都能干,然后一切需求都可以用它来实现啊,所以这个时候我们就直接放大招。呃,当然了,我这里就直接把这个我们可以定义一个另外一个变量,对吧,比方说我们叫一个什么。对,可以啊。STEM,呃,我们就直接在之前的基础上KBY之后,然后定义一个process function对吧,那么这个process方式我们要那个连续上升温度报警,那我们叫一个temperature,呃,上升increase alert吧,就叫一个这样的一个报警。
03:06
然后接下来我们是不是就是要把这个要做一个处理了,对吧,一个key process function,跟前面我们简单的那个实现是一样的啊class。呃,Temp increase alert,它要去实现一个K的process function啊,还是一样啊,啊,那大家这会就得想一想了,这里边它的类型是什么?首先key是string对吧?然后后边对that reading,这是输入的数据,后面是out,输出什么呢?啊,那我们可能想输出是不是应该是一个报警信息啊,报警信息的话,那我直接输出那个string得了。String报警对吧,一个字符串报警的字符串啊,然后接下来大家就会想到了,在这个过程当中,我们怎么去实现呢。
04:04
啊,首先是这个里边一定要实现的方法,Process element,这个先放这对吧,呃,先我们还不知道该怎么实现,但是肯定要放这儿的,那大家自然会想到,在这个过程当中,其实我们是应该怎么样啊,它是来一个元素,是不是就到这里边来执行一遍啊。我们要判断它温度连续上升,连续上升的话,那大家想一想,这这是相当于得干什么呀。是不是相当于我得判断跟上一次上一条数据那个温度要比较啊,来跟他比,如果要比它大,这是不是就代表上升了,如果要一直是比上一个大,这是不是就是连续上升了,哎,所以这就是我们能想到的一个基本的状况,然后另外就是这个怎么样去报警呢?他还提到就是要这个一秒钟之内或者两秒钟之内,对吧,连续上升就报警,那我可以怎么去做,我这个一秒钟怎么去判断呢?是不是可以调用这里边的时间服务去指定一个。
05:12
定时器啊,这个定时器就我当前的这个数据来了之后,然后我定一个定时器,一秒之后触发,触发的时候我判断一下在这个过程当中,是不是所有的元素都是连续上升的状态,对不对?哎,如果要是这过过程当中它出现下降了,那是不是我就可以直接把它给删掉了呀,定时器删掉不要让它执行就好了,对不对,如果要是它正常触发这个定时器了,那是不是说明就一直在上升啊,哎,那所以到时候我输出一个报警信息就完事了,这就是我们整体的一个思路对吧?但是大家会想到在这个过程当中,我得我得存个什么东西啊。我我其他的都已经知道了,当前的这个温度知道了对吧?然后我要输出的信息,那自己去定义就可以了,然后我需要呃注册定时器,那直接调这个时间服务注册就可以了,这个逻辑都是都能够理清了,但是我们还需要知道什么,还需要知道上一次的温度,诶那这个就比较麻烦了,大家看这个process element是什么呢?流处理它是来一个数据进来一次,他能拿到上一次的数据吗?
06:23
不能对吧,那上一次的数据怎么办?怎么获取,能不能拿到之前的数据,当然是可以的,怎么样去拿到之前的数据呢?其实就是把它保存成当前的状态,是不是就可以了?所以这里啊,我们就先定义一个状态啊,这里其实在我们讲这个protect方式的时候,大家看到啊,往往它就要跟状态编程要结合在一起了,对吧?这里尽管我们没有开始给大家详细讲状态相关的东西,但是这里边已经是涉及到状态编程的内容了,好,那所以呃,我们定义一个状态。
07:08
用来保存啊,保存什么呢?就是上一个数据的温度值啊,这是一个状态对吧?所以大家看一下这个在process function里边,这个状态怎么定义,先先给大家看一下实现啊,具体我们到后面再再讲啊,当然这里面定义我用一个这个lazy的方式,大家知道lazy吧啊对吧,懒加懒懒加载对吧?啊这样的一个方式就是就是一开始定义的时候还不要呃,直接把它就就相当于执行,而是等到调用的时候再去执行对吧?啊所以这里边我们看啊,定义一个状态,我这个状态叫last温度temperature temp对吧,它是一个什么类型呢?是一个value state类型啊,这就是flink里边,大家看flink API里边的对吧,在common state里边的定义的,这就是一个状态的类型,对吧?
08:07
定义一个value state,当然本身它是个value state,那实际上底层它到底数据是什么类型呢?还应该,呃,对,那它这个类型应该是double温度嘛,温度值是一个double啊,大家看它这个到底怎么定义啊,它定义的方式是要get wrongtime context。因为大家知道这个你既然是定义状态嘛,对,这个是不是要跟我们当前的那个上下文相关啊,对吧?呃,这这个要让flink上下文把我们整个这个状态要管理起来的,所以说这个要从状态里面去获取它的那个句柄,所以这里边要get state,大家看一下,哎,这样去定义,然后接下来里边要传什么呢。里边要传一个,诶大家看到那个了,Value state script要传一个描述器对吧?哎,所以这里边你用一个描述器,Value state script script,诶大家看它是一个double类型的,对吧,那在这里边它又要传什么东西呢。
09:13
啊,我们点进去看一眼啊,这里边要传的是一个name,就是你这个状态叫什么名啊,你得明确对吧,另外还得有一个type plus啊,这两个是必须的啊,后面大家想这个default value就不一定对不对,对吧,它肯定有那个呃,构造函数是不用传这个的啊啊,我这里就不着啊这个对吧,String和呃,一个内容和一个那个Type Class就可以了,所以这里边我给一个名字叫做就叫last time后边的那个呃,它的类型那怎么去呢?要class of。打对吧,啊,大家看这样就可以把这个状态定义出来,一长串定义一个状态这么麻烦对吧?啊,大家熟悉的话,其实就看到所有状态都这么定义啊,就大家熟悉知道这个套路了也就好了,接下来。
10:09
呃,大家想一想,除了这个状态之外,我们还要别的状态吗?这里边其实还涉及到另外一个状态啊,当然就是大家如果要是一开始还没想到的话,我们可以先呃先先先不管啊,那大家会想到我这里边处理的时候怎么处理呢。那其实很简单,就是是不是应该每来一个数据的时候,应该先取出上一次的那个数据啊,从状态里边取对不对啊,好,我们就来那个先取出上一个温度值啊,那么大家看一下这个东西怎么取啊,就是previous time,它怎么取呢?从状态里边取它的调用。
11:00
它有个方法,直接点value拿出来就可以了,啊,这就是我们当前的这个值对吧,大家注意这是个value state,那其实就是一个值类型对不对,所以它就是一个值而已,大家当成一个变量把它直接用就可以了,然后接下来呢?啊,接下来大家会想到我们是不是上一个值取出来了,然后我们的这个。状态是不是就应该更新啊,是不是当前最新的这个数据就应该写成呃,Last time了,下一次来的时候就得用这个了,对吧?哎,所以这里面我们更新这个温度值。呃,这个操作就是last temp怎么去更新呢?大家看到有一个update操作对不对啊,这里边直接传一个double类型就可以,当然这个double类型是不是要从传入的数据这个value里面去取啊value。呃,点temperature temperature啊,直接把这个传进去就OK了啊,这就我们先完成了这个温度的更新,然后接下来做什么操作呢。
12:15
大家会想到我们不是说要连续温度连续上升的话,那就应该要报警吗?对吧?呃,那好,我们先写就是温度连续,诶就是连续上升,什么叫连续上升呢?那其实就是说是不是现在比之前的那个大啊,对吧,如果温度上升的话。上升啊。呃,另外大家会想还应该干什么,就是如果温度上升,而且是不是我们没有设置过定时器的话,我们就可以直接应该干什么了。设置一个定时器,等一秒钟之后触发对不对,那就应该做这个操作啊,那这里面大家还不知道那个就是想不想考虑这个定时器的事情的话,那我们就先这么写吧,比方说我先就直接判断温度上升则。
13:20
呃,注册定时器对吧。呃,那我们判断的是不是就应该是value.temperature这是当前的这个对吧?如果要是大于上一个温度的话,那我们就注册定时器,对不对啊,怎怎么注册定时器,大家还记得吗?直接有个是不是直接Ctx.time service对吧,然后注册我们要注册一个啊,当然这里边就是看我们按什么来触发了啊,我也可以直接注册一个processing timer对吧,因为这个就简单一点,就可以不用那个等那个water rock了,我们就放那等几秒它就应该出发对不对,可以注册一个processing time timer,诶这里边要传一个浪,这个浪给什么呢?给一秒没。
14:16
给一秒就行吗?这里大家要注意啊。不能直接给一秒,为什么呢?因为这里边它的这个timer里边直接给的这个就是一个时间戳,而不是某个延迟。也就是说你给什么时间戳都可以,它就当成一个时间戳,一个整形数给过来了,你要给一的话,给一秒给1000的话。永远到不了。那就相当于是什么呢?那就相当于是1970年1月1号0.10分那个一秒对吧,就是那个时间,那那我们这里就永远到不了了,对吧?哎,所以就是这里大家要注意啊,那我们这里可能就得注意,那得是什么呢?那时候得是当前时间再加上一秒啊,所以那我们是不是还得先获取当前时间才行啊,我设置一个time time step,呃,Time step r ts。
15:14
呃,就是timer的那个time step啊,那大家会想到它应该是什么啊,应该是不是就是当前的时间,当前时间怎么去获取?对,大家记得在那个time service里边是不是有一个可以get,哎,不,不是get啊,直接就是什么current,大家还记得吗?Current presenting time对不对,对吧?这就是你要是处理时间就presenting time,如果要是那个time的话,是不是就watermark啊,对吧,直接拿它就可以了,所以我们就先把它获取到是不是,然后再加上1000就完事啊,这是不是就是我们要设置的那个定时器的时间戳啊,好,我们把这个传进来就OK啊,这就是我们的定时器。
16:03
好呃,那大家就会想到,那如果要它小于呢?Else的话,如果要小的话,那怎么办?如果要是小于的话,我们是不是得删除定时器啊,诶,那删除定时器就麻烦了,大家就会想到,哎,我如果删除定时器,我当时注册的时候,那是要带一个时间戳去注册的呀,删除的时候我直接delete。他知道我删除的哪个吗?我我如果这个任务多了之后,是不是有可能注册了好几个定时器啊。那flink你如果要是直接去删除,大家看到也没有,就是删除or的一个操作对不对,呃呃,而且那个一般也不会用,你不能说我这儿有有什么需求把别人的都删掉了,对吧,那是不是你应该是当时定义的哪个定时器,我现在就应该删哪个定时器啊,诶那所以大家就会想到我那个删除的时候,里边也得传一个时间戳进去。
17:02
那这个时间戳是我当前的时间戳吗?不对,如果说我现在温度下降的话,而且已经设置了那个定时器的话,那是不是相当于我应该是在之前某个时间点设置的定时器啊,那是当时的时间戳加的一对不对。那我现在去删除,你按当现在的时间那就不行了。所以大家会发现是不是我还得保存一个东西啊,对吧,既然是当时的那个东西,那是不是又得保存,得保存什么,对当时设置这个这个定时器的时间说是不是得存下来,哎,所以前面我们再多定义一个啊。定义一个状态用来保存,哎,这就是定时器的时间醇,哎,那大家知道这个过程也一样,对吧?啊,这个我们就叫做current current当前的定时器current timer吧。
18:11
然后同样这也是一个value state,对吧,这里边的类型它是什么啊,它是个long对吧,时间戳嘛,Get,诶,错了啊,Get wrongtime contacts,然后我们去get state,这里面是不是可以传一个哎,Value state de script啊。同样给一个名字叫current timer,然后这里的class of就是love,对不对?诶把它定义出来,那大家就会想到前边我们这里边去注册定时器的时候,其实也得有讲究了。比方说什么呢?你是不是不要说我之前已经注册了定时器,已经在等着一秒钟之内在判断它连续上升的,对吧?你在这个过程当中,你来了数据之后,你又去注册一个定时器,那就没必要了,对不对,我现在是不是还在一秒钟等待的过程当中了,你只要是连续上升,你你不删它就完了,是不是等到一秒钟我我等它触发就完了,所以就不要再注册了,所以这个时候其实是温度上升,且。
19:24
没有射过。定时器,那么就注册一个定时器对不对?哎,所以这里边我要求的是它大于这个,而且要什么。而且这对是不是我们这个之前没有定义过,没有定义过,那是不是这个current timer里边就应该是零啊,呃,这里边不是没有值啊,大家注意啊,本身它它这个状态,呃,大家注意这里边我们还没把它拿出来呢,对不对,这个还在那个状态里边没拿出来呢,所以我这里再定义一个current time time current timer的TS对吧,Time step从current timer里边。
20:10
是不是也是直接value把它拿出来啊,先拿出来再说,这里大家要注意一下啊,这个点value拿出来是不是肯定应该有值啊,因为我们这里边要的是一个long类型,你要不然的话,那变成空的,这个value是不是直接就报报空指针了,对吧?啊,所以大家注意啊,本身那个current timer里本身它肯定有值,然后里边如果要是没有设置过的话,默认值就是零,对,所以我们要判断的就是什么。是不是current time timer的这个TS要等于零的话,是不是就去注册一个呀?诶这就是我们这个逻辑就跑顺了啊,所以大家看这个其实process function结合状态编程的时候,就是里边有很多这个逻辑,你得绕来绕去搞清楚对吧?好,那在这里边我们这里边是不是还得多一步,你注册定时器之后,对是不是得存到那个,呃,我们那个current timer里边对不对,状态里边得存进去,哎,所以这里我们要current timer.update把我们这个timer ts是不是要存进来啊,哎,所以这样的话,这就是完整的一个流程了,好。
21:27
那接下来else的话,哎呀,前面既然是一个组合的一个条件,那这个else情况就多了,对不对,那我们直接else if吧,我们关心的是什么,什么时候报警呢。其实我们关心的就一个是,如果它这样来了之后连续上升,对吧,注册定时器这是一个事儿,另外一个是不是就是我们是不是如果要下降的话,是不是应该直接把它删掉啊,对,这个应该把它删掉啊,那else什么时候删呢?就是呃。前一个,呃,其实这个就是大和小,这个无所谓啊,就是它如果要是value.temperature前一个比它大的话,现在是不是下降了,而且大家会想到另外还需要考虑的一个就是假如说我们当前就是第一条数据来了的话。
22:21
第一条数据按道理是你不做操作也可以,但是如果我们为了就是考虑。多一点的话,那第一条数据来的时候,是不是也应该假如有定时器,应该把定时器清空啊,就是假如我们防预备,就是你之前那个定时器状态有残留啊,所以说如果出现这种情况的话,我们还得把它这个把它给删掉啊,到这边要把它删掉好,这个temple是不是,如果它是等于零的话,对吧,我们就把它啊,当然这个是一个double,那就是0.0呗。直接删掉ctx。
23:00
大家还记得用什么方法删对time service里边是不是delete啊,Delete哪一个对,注册的是什么,这里就delete什么对吧?那delete这里面传什么值呢?是不是得传我们提出来的这个玩意儿啊,对吧?把这个东西串进来好,所以这样把它删掉就OK了,然后这里大家注意就是我们可以做一个什么事情呢?就是呃,既然是已经把这个定时器都删掉了吗?那其实我们是不是应该把对应的状态清空啊,这个大家就是养成好习惯,要不然的话,是不是我们状态就有可能增长,一直增长,到时候就有可能撑爆内存啊,对吧?所以这里边大家看它有一个方法叫clear啊,这就是清空状态的一个方法啊,所以大家看啊,我们这里边删除定时器。
24:01
并清空状态。如果温度下降或是。第一条数据。删除这个并清空,好,这就是我们要做的事情,对吧?好,那那我们已经做完了吗?是不是所有事情都做完了。哎,对,嗯,大家会想,你光是定义了一个闹钟,定义了一个定时器,那定时器触发的时候,你到底要干什么事儿啊,你肯定是因为在那个时间点有事要干对不对,所以你想了之后,你得赶紧去干事啊,那我们这里边是不是还没有定义要干什么事啊,要定义干的事儿在哪里定义?啊,这里大家要注意了,对,是不是得到回调里边去定义啊,啊回调是不是怎么写on timer对吧?哎,这里我们可以直接大家看。
25:03
直接实现它里边给我们给定的这个on timer方法啊,大家可以看到这里边on timer传进来是什么呢?一个time STEM,这是不是就是当前的那个时间啊,对吧?到什么时间才能触发到这个timer,那肯定就是当时我们设定的那个时间是这个time step啊,然后还有什么呢?还有一个ctx上下文,另外还有什么,哎,还有一个out,是不是在on timever里边也可以输出东西啊,诶非常好,我们是不是就是想到这个时候就输出报警信息了,哎,所以只要它触发,大家就会想到,是不是就相当于我在一秒钟之内没有出现下降的情况,把它清掉,对吧,那是不是就应该报警了啊,所以直接报警。输出报警信息,那大家看一下这个输出怎么输出啊,是不是要用out。对吧,那么out怎么样去输出呢?Out,它本身是一个collect类型,对不对?哎,直接用它的这个collect方法就收集,对不对,把这个对应的数据收集起来,收集到这个al里边,它就自就是flink给我们处理的时候,它就输出到流里边去了,好那么这里边我们传什么东西呢?啊,我们定义的就是一个字符串对吧?那所以直接报报个警就完了,比方说这是SENS430几呢。
26:27
这个大家知道吗?这是不是得找到他的那个key啊,哎,那key他的那个ID我我们怎么能知道呢,这会儿这个连数据都没了,你在之前还知道数据,哎,对,大家知道上下文里边是不是有一个get current key啊,诶大家看这个上下文在这个on timer里边也是也是有get的key方法的啊,所以我们可以直接拿到它的key,然后接下来。我们就这一个。呃,三四几,诶这个就不用这个SEN4几了,是吧,我们直接用他们的key。
27:07
然后去输出一个什么,呃,温度连续上升对吧。好,所以这就是我们要输出的内容啊,当然了,就是到了这里,大家如果要是考虑更多的情况,我是不是应该把那个状态都清空啊,Current timer是不是应该清空了啊,当然了,就是如果是上一个温度你不愿意清空的话,你可以留着对不对,因为你是不是从下一个数据开始,是接着上一个数据考虑是不是还继续连续上升,对吧,那这个也是可以的啊,所以这个我那个状态就不行了,大家根据需要来,这就是我们这个代码。大家觉得这个方式复杂吗?呃,这个就是涉及到底层的一些操作,又结合了这个状态的一些处理,一些编程的习惯,可能跟大家之前的这个方式都不一样,还涉及到毁掉对吧?啊,这个过程就确实是,呃,我们测一下吧,看看看看效果吧。
28:10
跑起来。好,大家看一下我们这个琪琪来了,然后现在我们发数据吧。诶,大家会想到。我这里有一个35对吧。第一条数据发过去之后没用对不对,第二条数据啊,当然这里面大家会知道我这个TIME3是不是改不改无所谓啊,因为我们用这里面那个定时器是processing time对吧?对给个30,呃三十三十六对连续上升。诶,大家看到这个好像没有触发这个操作是不是对,因为为什么呢?因为我刚才是不是耽误时间有点太长了,对吧,一秒钟之内,它是不是定时器已经触发了,但是没有检测到数据对不对,对吧,所以这个过程当中相当于是就是呃。
29:09
这样一个状态啊,呃,这样吧,我把这个时间可以调大一点,对不对对,连续比方说我们这个那个这个我们给个1万,那就是十秒钟对不对,我们用十秒钟来试一下。好。35。再来一个36。三七。诶,这个这个我们好像设置的有点问题是吧,他好像一直没有。但大家看到这个过程当中好像一直没有触发这个定时器啊,检查一下代码。但大家会发现,其实我们逻辑应该还是没什么大问题的,但是我们是不是这里边只是process完了,是不是没有print啊。
30:05
大家看是不是这个我们是不是只做了data stream的print,我们这个process的stream是不是没有print呀?啊,那那当然就看不到了,对吧,即使他触发了我们什么也看不到啊,所以把它print一下,这个叫process对吧。好,现在再跑一下啊。接下来我们还是发一下这个数据试一下。没事,我们就等等几秒吧,应该大家不着急吧。好,36。大家看第二条数据来了对吧,第二条数据来了之后,是不是理论上,大家想理论上过十秒钟是不是应该就得输出,诶大家看是不是已经输出了。
31:02
对吧,因为第一条来的时候我们要求是清空对吧,第二条来的时候是不是就跟之前的比对啊,只要上升是不是过十秒钟就应该输出报警信息,所以这个就符合我们的预期对不对?哎,所以这个就是刚才的这种情况。
我来说两句