00:00
现在我们已经知道在process function里边可以获取到当前的时间,可以注册定时器啊,定定义这个未来某个时间点要发生的事情,这些都是可以做操作的。那另外我们说process function式呢,还有一个比较有用的特点是特殊数流啊,就是之前我们讲到分流操作的时候,Speed select已经被弃用了,那用什么呢?就是用process function它里边的这个测殊输流来实现分流的效果啊,那这里边我们还是单独的再去定义另外一个吧。定义一个object,然后我们这个叫做测试输流的一个测试啊,Side output test。Output。然后这里面的本身的定义前面都差不多啊,所以我就还是把上面的内容直接copy过来。哎,我们直接可以copy到这个还是啊,Copy到这个data streamam这里记得把影视转换引入,然后接下来,诶,我们就一开始就不要忘记,就是会有一个这个print,对吧,我先把这个写出来啊,这里边是side output test,然后前边我们这里边就定义一个,比方说我想要去做一个分流操作,大家还记得我们之前那个,呃。
01:22
根据30度划分高低温对吧?那现在我就相当于是直接做一个分流,那这个分流要用这个侧输出流做分流,那是一个什么效果呢?呃,所以就相当于我的这个分流操作,并不是看起来好像啊,并不是一个平等的一分为二的一个过程,而是什么样的呢?是主流里边保留了一部分数据,这是主流,然后呢,另外一部分呢,放到了侧数数流,就相当于我这里边做了一个分支,对吧?主流里面有一条,然后呢,在侧向再分开一条,其实也是一分为二,但是好像不是那么平等的样子啊,在代码上它会有所体现,那比方说我们现在主流认为我定义这个高温流是主流吧,啊,如果高温流是主流的话,那就得怎么做呢?就相当于是呃,我定义一个high,就等于stream直接去做操作,这里边你既然是要直接基于这个温度值,直直接去做这。
02:23
这个呃区分对吧,30度以上的高温流,30度以下低温流,那大家大家知道这就跟这个KY就真的没关系了,对吧?啊,你这不需要再做那个分组了,直接所有数据来了之后,直接做操作就完事,所以我可以直接调这个process,那里边你要用到这个测试出流的话,那就得实现的就是一个普通的基于data stream的process function了,这里边我们new有一个,呃,这个叫做split分流对吧?呃,Split temp啊这样的一个操作啊,Processor。好,我把这个传进来,哎,那这里边我可以传一个当前的那个标准的温度对吧?区分温度,比方说我们当前是三十点零三十度放在这儿,然后后边,哎,那我记得把这个直接print打印出来,这个硫是氦,哎,那有同学可能就说了,那我那个低温流到底到哪去取呢?哎,低温流很简单,那就是要基于当前的high time stream去get side output对吧?然后这里面可能我要自己去定义一个output tag啊,Output tag,然后这个类型是什么呢?哎,我就还是当前的这个s reading本身的这个数据类型,把它定义出来就完事了,或者说我想要做一个数据的调整也是可以的啊,比方说我想把它转换成一个元组类型对吧?或者转换成一个其他类型都可以啊,那这里边我们就既然是分流嘛啊,这里边就就不做这个调整了啊,啊或者还是给大家改一下吧,因为之前我们做Li的时候,那个数据类型是不能不能改的,而现在。
03:59
Process方式,因为是大招嘛,什么都能干对吧,数据类型也也可以不确定,可以直接把它类似于做map转换类于做filter对吧,啥都能做啊,所以接下来这里边我直接定义成一个三元组好了,尽管没什么区别啊,长整形啊,然后这个double对吧,三元组把它提取出来,然后里边需要有一个这个标签的名称,比方说这个我叫漏对吧啊,这样的话就可以把它拿出来了,拿出来之后呢,再去print打印输出,这个叫漏对吧?啊,这就是这个完整的处理的过程啊,先把这个整体的架构出来,接下来就是实现自定义的process function function,呃,进行分流对吧,利用测输出流进行分流,那么这里边我们把这个定义出来,呃,当时我们叫的这个叫做split是吧,Split temp processor,然后这里面的这个温度。
04:59
这是一个的。
05:03
一个阈值,然后接下来它需要去实现的就是一个最普通的process方,然后把这个引入啊,然后里边它的类型呢,之前我们那个是KIO对吧,这里边没有K了,因为不做K嘛,所以直接就是IO啊,就像一个map一样啊,所以这里边给的就是ss reading传进来,然后输出什么数据类型呢?诶注意这里边不是我们的元组类型,为什么呢?这里面定义的它是主流的数据类型。啊,所以说氦他们这里边,如果我们还是这个sens reading的话,那我就还是以这个作为作为数据类型直接输出对吧?啊,这这跟我们这个测输出流是两回事,这是主流的输出类型,然后接下来这个里边必须要实现一个process element的方法啊,这个必须要重写,这还是一样的,然后在这个方法里边,当前我们只只按照这个stress后D去判断温度值就可以,那不需要有状态对吧,而且你这个大家知道,你在这个就是process方式里边,你也不能定义k state嘛,对吧,你只能是定义别的那个operator state啊,那我们这里边就不做这个,呃,实现了也不需要嘛,这里边我们就直接去去判断就完事儿了。好,那接接下来我们的这个判断过程就是if当前的value,如果说啊,如果说要大于传进来的这个hold的话,那我们做一个操作这里面啊,呃,这个我在这儿还是框框起来给大家。
06:35
写一下啊,协一条注释就是如value.temperature啊,如果当前数据当前温度值,温度值大于30啊,那么输出到主流对吧?哎,那这里边怎么输出到主流呢?大家还记得用那个collector对吧?要用这个啊,所以是out调它的collect的方法,然后里边我们传一个sensor reading啊,那不用传senor reading了,本来就是当前value就是s reading对吧,直接输出就完事了啊,所以大家看你如果只看这儿的话,这是不是相当于一个filter一样啊,对吧?哎,这这就是我的filter的条件嘛,只不过filter的话你必须是原样输出,而我们这里边还能做这个数据格式的转换啊,所以说这个process方式就是一个,呃,底层的调用啊,什么都能做。
07:35
啊,那另外如果要是说比30度小啊,这里面就是小于等于了啊,我们就不不专门考虑等于的情况了,如果啊不超过30度,那么输出。到测数数流。呃,这里边就是这个侧输出流,大家看一看这种输出方式到底怎么去定义,那就得用上下文ctx outt本身这是主流的输出方式,对吧?哎,我们说收集起来写到这个输出的缓冲区里边去,那这里边测出内容怎么办呢?用上下文调用它的output方法,那么outputput方法里边大家看到有两个参数,一个是output tag,哎,那这个taag呢,必须跟前面定义的这个类型和名称一模一样啊,所以这里边我可以把它这个copy过来,然后另外还有一个参数,就是当前的数据嘛,你当前的这个三元组到底要什么对吧?哎,那我这里边value.id然后value点长整形的那个时间戳,最后还有一个value.temperature对吧?啊,数据结构做了一个转换输出三元组,所以这个其实还是整体来讲是非常简单的啊,就用这个测输出流做了一个分流的操作,好我们接下来还是给大家。
08:58
这个效果。
09:00
好,那我们来看一下当前的效果,我们已经运行起来了,那接下来还是啊,这里面啊这个。已经复制了别的内容,我们还是输出一下之前的这个341,这个35 35.8,哎,大家看这显然是一个氦,对吧,显然是一个大于的一个值,那如果我来一个15的话,显然是一个漏啊,这个输出的效果跟我们之前的那个分流几乎是一模一样,而且还更强大了,它数据类型都可以做转换,你可以做各种各样不同的具体的自定义的一些操作,这就是process function进行分流操作,利用测输出流进行分流操作的过程。
我来说两句