00:00
再来我们再来讲process function里边的另外一个主要的用途,那就是它可以做测数数流,前面我们说了process function式里边的这个测输出流,它其实就是用那个上下文ctx里边有一个out的方法,然后我们可以指定一个测出流标签,然后把对应的那个数据扔进去就可以了,对吧?啊,所以它的调用其实是非常简单的,而现在我们对于这个分流操作。大家也看到源码里边的推荐方式就是用测试出流啊,所以整体来讲,你说这个process function它是很底层,呃,是是很灵活对吧,用法可能也又特别特别多啊,看起来这个逻辑也比较复杂,但是大家必须了解啊,至少这些基本的功能点都应该知道,那接下来我们就用这个process方式再把之前讲过的那个split s select实现过的高温低温流的拆分分流操作再来实现一遍,那接下来我们看一下代码里边怎么写。
01:01
首先还是新建一个测试的类,这个我们叫process test3,呃,这个其实也是一个application case对吧,但是我们主要是用来测这个set output啊,我们叫set output case。前面的这个流程,呃,我们自然能想到都一样啊,直接创建执行环境,然后读取set文本流,转换成S,呃,Reading类型,后面那就是直接process做处理,最后打印输出啊。直接全copy过来,完整copy过来,那中间其实就是这里面要改了,对吧,那大家想一下,现在我们这个分流操作的话,要按照K来做区分吗。我们分流是按照温度分对吧?那K有关系吗?是要每个K单独分成两条流吗?没有这种做法对吧?所以这里边我们其实就不需要去对,不需要去K了,其实我们就是一个process function不用分组,然后我们。
02:08
相当于是这个自定义测输出流,实现分流操作。接下来我们就不需要了。直接就是一部process。那这里边的process当然就需要哎,自己再去重新写一下这个这个过程了啊,大家看这里边直接他报错了,因为我们这里边是一个key process方式对吧?呃,这里当然就不能这么用了啊,这里边我你有一个还是自己做一个重新的定义啊,呃,或者我这里面干脆用另外一种写法,大家想我这里面直接写成匿名类是不是也可以啊。哎,就是我这里边直接在后边一个,呃,这个process function。这不就完事了吗?啊,当然这里面我可以指定当前它的输出类型啊,这里面就还涉及到一个啊,就是你最好呢,要把后边是不是要这个VR要写出来啊,对吧,我把这个叫做这是我当前最后的结果,既然要分流,那大家想就涉及到。
03:13
啊,比方说我们是分成高温流,低温流,那现在是用测输出流做分流,是不是就涉及到主流是谁,测输出流是谁啊,那比方说现在我们定义的流就是高温流对吧?High temp stream这样去定义,那后边high temp stream可以去直接去输出,这个叫做呃,High对吧,High temp。那同样下边这个低温流就应该怎么去做呢。是不是就对,是不是要基于这条流,就像我们之前那个window输出到,呃,延迟到数据输出到测数据流一样,是不是要得到的那个data stream,最后再去调一个get set output方法呀,啊,当然这个方法大家记住是single output stream operator里面的,所以你不能把这定义成data stream对吧?啊,你你要把这个定义成dataam就调不了了,这里边要传的就是我们想要的那个output,那呃,Out output tag对吧?所以在这我是不是在外边可以把那个output tag直接定义出来啊,定义一个。
04:19
Outputput tag。用来呃表示测输出流,也就是低温流对吧。所以你有一个outputb。Out out to the tag,这里边的数据类型大家发现我应该是直接给那个sensor reading就可以,对吧?我这里边的数据类型也不用做做转变了啊,直接放这个原始的数据类型就完事了,里边是不是要给一个当前的ID啊,当前的ID我这个随便叫啊,比方说我就叫做load time,这这没毛病对吧,直接把它定义在这,然后大家注意,一般情况我们推荐大家是后边写一个划括号,这就相当于是。
05:07
这就相当于是一个用了一个匿名内部类的实现,因为在这个autout tag里边,大家看到它这个类里边,本身这个类是带泛型的,但是它的构造方法里边是不是没有泛型定义啊,然后这里边它直接就用这个type的提取器去创建我们当前的这个呃类型信息了,那大家会想到我这里边用当前的这个反射啊,直接拿到它的这个类型信息的时候,是不是就会有这个泛型擦除,就就得不到里边的那个类型了啊,所以呃,就是当前我的当前我定义好的这个类型,你如果要就是一个sense reading的话,这个我们问题不大,因为符合我们定义的那个类型嘛。但是你想你如果要是一个一个一个元组对吧,一个temple的话,那是不是后面这里边就会有问题啊,所以为了方便这个类型解析啊,大家要把这个实现成一个,最好是实现成这样一个,就是匿名内部类的这种方式啊。
06:04
好,接下来在里边get set output的时候,我就可以直接去获取当前的就是low ta tag,然后得到的这个是不是就是我们的低温流啊,对吧啊,那这个我叫low。这就是最后的一个输出,好,那当然前面这里边这个process function呢,还要做一个具体的定义,当前的这个输出应该是什么?是不是还是sensor reading啊,对吧?前面这里边也要改成sensor reading啊,因为高温流本身也是原封不动嘛,我只做分流,那就得到这个是什么,把它直接扔到这个高温流就完事了啊啊不做转换,或者你想转换大家发现是不是也可以啊,因为当前process function的输入输出是完全自定义的,而那个测输出流的类型是不是也是自定义的呀,它这个就比split s select,大家可以发现是不是功能就多多了呀,对吧,你在里边你也可以结合我们当前呃这个上下文里边的一些信息,另外就是说你也可以呃,就是转换成不同的数据类型啊,这个非常的方便,好,那里边关键的就是一个process element里面的方法了,这里边其实就呃,也没什么特别的,大家想这是不是就是一个if else判断啊,对吧。
07:21
如果要是当前的value,当前我的这个数据不是叫做value吗?Get temperature,如果要是大于,诶,我定义的那个30啊,但是这里边就是你如果想按我们之前那样把那个30当成一个阈值传进来的话,就不要用这个呃,匿名类了,那如果说这里边你用了匿名类的话,相当于这个就写死了啊,如果大于30怎么办?对,诶,大家想到这是不是应该是高温流啊,对吧?就是判断温度大于30度是高温流,高温流输出到哪里?哎,对奥collect输出到主流对不对?哎,那那另外如果是小于30度呢?小于30度就是低温流对吧?低温流是输出到测输出流,哎,所以整个这个过程其实就是一个if else的过程,那主流当然是out.collect然后直接把这个value扔出去就完事了,对吧?哎,那另外else是不是就是ctx output里边传进来的是那个low time tag,然后把当前的value放在这是不是就完事了?大家看这个out步的两个参数是不是一个是当前测出流标签,另外一个是要输出的那个数据啊啊,所以非常简单啊,直接这么一调就直接。
08:47
再把它搞定了,这就是完整的一个处理流程,大家看是不是,呃,比我们之前那个sweet select的话,好像也没有复杂太多是吧,就直接自定义啊,实现这样一个process function就可以了。
09:00
那接下来我们来测试一下。现在代码启动起来,还是我们把这个做一个分屏显示,这边开始输入。接下来这个341,我们就从前面开始输入啊,这不是最初的那个数据吗。341。好,来看341,当然这个是high temp对吧,高温流的数据,然后后边这个346。15.4大家看这个其实跟K没关系,对吧,它就来一个判断一个这就是low temp啊,那对应的这个3476.7的话,当然还是低温流low temp,那SENS10的话。38.1,这就是一个高温了,对吧?啊,所以这个就是非常简单啊,你这个如果要是对应的输出各种各样的数据啊,对应的它都是只判断当前温度值,做一个拆分就完事了,实现的效果跟之前的split select是一模一样的。
10:00
这是关于测试入流的一个测试。
我来说两句