00:00
接下来给大家讲,就是测出出流这样一个东西,其实这个概念大家也不陌生,之前在哪里大家见过这个概念,在应该是在window的那个迟到数据的处理里边,是不是见到过这个测试出流这个概念啊,当时我们没专门去实现啊,但是大家会发现,现在我们讲完之后,大家会发现就是实现起来应该都差不太多,而且那个会更简单啊,这里边大家看一下,这里边我们是要用什么去做一个测数出流呢?那就是process function,它还有这个功能。我们都知道,就是这个大多数的算子啊,它的输出都是单一输出,对吧,都是一条流,你输出来之后数据类型有可能改变,但是基本上都是,呃,你你该是一条流出来之后还是一条流,对不对啊,除非是就像我们之前说那个split split的话,它其实是之后是get了,戳之后又剪出来了,对吧?啊用那个select又剪出来了,除了这种情况,那大多数都是一条流进,那就是一条流出。
01:08
诶,这里边就给大家提供另外一种比较,呃,就是更加灵活的一种方式,那就是测输出流,就是一条流可以把它分成好几条流输出,其实当时在讲到讲到呃,Split stack的时候,曾经提过一句,大家还记得那个lit已经被弃用了吗?大家还记得那个注释里边源码的注释里边写的什么吗?Li我看一眼啊,哎,在这对吧,这漆用了对不对?这个上面是不是说please use side output set对吧?啊,所以大家看他就是他现在推荐的就是让我们用这个底层的这种测输出流的这种方式,对不对啊,所以它这是它的统一推荐啊,就把所有的这种方式都集成到一起来做了,呃,那现在大家就看一下这个测输测输出流,看起来是可以把一条流分成两条,那process function到底是怎么做这件事情的呢?呃,我们就再来看一个例子啊。
02:17
现在这个例子呢,呃,我我就另外再。再新建一个吧。呃,这个就已经不是API的test了,这个就是测出流对吧,Side out output test好,在这个代码里边,我们肯定是大多数都是直接抄这边就OK啊。好,我先把它抄过来。好。好,接下来大家看到啊,我们准备在这个里边实现一个什么功能呢?呃,就是准备测试,就是检测一下我们这个传感器的温度,其实很简单,就是判断一下这个传感器温度,当时我们不是那个split的时候,可以有有害有有漏吗?对吧?大家还记得吧,我们现在就想用这个测输出流的方式,把当时的那个功能再实现一下,比方说我们就想着温度如果低于某个值的话,就输出,哎,就输出到一个一个特殊的漏那个流里面去,对吧?或者说我们认为低于某个时候,我们要做一个低温报警啊,那这个就是一个测试主流报警信息流,对吧?那正常的流就应该在原先的主流里边,对不对啊,高就是高温的,那就可以放在那个主流里边,好,那接下来大家看一下这个过程怎么实现,首先我们这里边还是啊,就是前边是这个S读进来,后边做KBYK。
03:55
之后呃,诶,这个其实大家会想到不用做KY了,对吧,如果要只是按照温度去做这个划分的话啊,那其实都都都不用KKY了啊,所以这里边我们就直接在data stream基础上直接process,那么这个就会呃不太一样啊,我们就把这个叫做那个低温的检测吧,叫呃那个低温,这里边有一个具体的场景,就是说咱们的那个传感器的那个数据呢,这个其实不是摄氏度啊。
04:29
它是什么呢?滑氏度,对,所以华氏度里边大家可能知道,就是它的那个温度普遍比摄氏度要高一点,对吧?啊,就整体来讲的话,一般是高一点,所以华氏度里边的,呃,32度就是摄氏度的零度,所以我们检测一下,就是如果它大于30,就是大于三十二零度以上的算正常。低于32以下的,我们就认为是个低温报警对吧,呃,零度报警。我们就叫。
05:01
呃,冰点报警吧,Freezing对吧。Freezing at,好。这是一个process function,那接下来我们要实现的时候,呃,Freezing它应该去,呃,继承一个什么呢,什么类呢。我们之前都是kid process function,现在可以是K吗?现在是不是直接就应该是process方式啊,因为对我们前面已经没有key by的操作了,所以它直接就得是process方式啊,这里边还得引入一下。然后这里边有泛型,那泛型大家知道应该是泛型应该是什么?呃,大家看一眼就是它这里边是什么,就是不是就少了一个K啊对吧,之前那个k process function是不是应该是KIO,这里边就只有一个IO了,对吧?所以这里边我们输入是sensor reading对吧,输出报警信息,那我们还是给一个string吧,好,然后接下来。
06:09
呃,这里边大家其实是要注意一下啊,这里边我们的这个这里的输出是我们的侧输出的结那个数据类型,还是主输出流的那个数据结构数据类型。注意,这里应该是主的。因为大家会想到这个process function是不是并不知道你是要要用测试出流啊,他是不是以为我们就是正常输出处理完了输出的一个状态,所以这里其实是我们要定义的是主输出流的数据器,那比方说主输流的话,我这里边就不改了,就还是三次reading原封不动输出就完了,对吧?啊,就还把它输出就好了,那接下来在这里边我们就要去做一些事情了,大家会想到呃。
07:00
既然是process方式,一定要去实现一个process element,对吧?那么在process element里边我们又应该怎么做呢?是不是要根据?是不是要根据当前的温度要做判断啊,比方说我们这里边这个这个是。冰点。报警。如果小于32度,32这个是华氏度啊呃,输报警信息到测输出流。侧输出流,嗯呃,输输出流啊好,所以这里面我们判断其实很简单,直接就是value no value.temperature对吧,然后怎么样呢,判断它小于32.0,如果是这样的话干什么,是不是要输出测出出流啊,大家看一下这个测出流怎么输出。
08:17
当然,如果要是说这个比32度大的话,那是什么?我们这里边啊。是不是就应该是输出到主流啊,对吧,主流大家记得怎么输出吗。之前已经做过一下主流是不是应该是out对吧?Out只要收集起来就能够输出到主流了,out.collect然后我们把这个VALUE6直接传进去是不是就OK了,对吧?这是我们的主流,然后接下来,那如果要是测试出流怎么去输出呢?这里大家注意啊,要用那就不能用out了,对吧,要用上下文。
09:00
Ctx上下文里边有一个output方法,大家看到了对吧?这个output方法里边要传的是什么呢?要传一个output tab。哎,所以就是它是以什么来标记这个测输出流,我们不是说要给它盖个戳嘛,对吧,到时候再好把它剪出来,所以它的戳这里就是一个T,一个output tag这样特定的一个定义好的一个数据类型。啊,所以这里边如果我们这里边直接去写的话,那有可能是需要去直接去new一个output tag对吧?啊这里边就是说我我如果要是想在之前就把它定义的话,那可以这个把它lazy的定义出来啊。呃,Lazy一个,比方说我就叫那个alert,报警的那个output。那么它是一个output tag,这个类型就得是我们测输出流的类型了,对吧,String类型,然后呃,你有一个output tag street里边大家看里边它其实就是传一个名字就可以了,对不对,一个戳嘛,所以这里边比方说我们叫freezing,呃。
10:24
Alert,好,这是一个报警信息啊,然后这里边我们要传的是不是就直接把呃前边freezing,诶,Alert是吧,这叫alert alert output,这个ta直接传进去就可以了,然后后边还得有一个是什么呢?就是你得有这个标签,另外是不是还得有,到底输出什么值,你是不是得知道啊,Value得有对吧?呃,这里面我们的值是个string,那就直接写一个就好了,比方说我们叫呃,低温报警,冰点报警啊,Freeing alert。
11:08
呃,For哪一个我们写一下吧,这里边呃,是不是我们那个就是value.id啊,对吧?啊,这个大家应该能想到啊,我们就直接把这个写写写入就可以了,这就是我们这个测输出,测输出流输出报警的一个过程,哎,就就这么简单就把它搞定了,好,那大家会想到同样啊,你到时候如果想能看到的话,是不是还得把它再。再再打印出来啊,哎,所以这里边我们这个data stream就先不打印了啊,这个processed stream打印是不是就就打印出那个测输出流了呢?注意不是。这里边打印出来的是对当时处理之后的主流,那测输出流得怎么去打印呢?得process stream,它有一个方法叫get side output,大家还记得那个window里边后后续的操作是不是也有一个get side output啊,所以它里边是不是也需要传一个output tag,一个标签对不对?哎,所以这里边我们可以就是定义一个那个名字是一样的一个标签就可以了,Output tag这里边也是一个string对吧?呃,那么这个是这个不要写错啊。
12:31
去哪了?好,我们把它传进去,然后再print,是不是就能直接把这个测数出流获取到打印出来了啊,这里边我们叫alert data,好,现在我们可以来输出一下了,对吧,看一下效果。
13:00
呃,接下来我们这个还是用sensor里边的这个数据吧,这个数据其实就无所谓哪个ID了,对不对,好。诶,大家看这是这是一条什么。这是一条正常的数据对不对?呃,所以它输出到的是这个,呃,就是sensor reading对吧,然后是process data对不对,然后346这个是不是,它就是我们的那个标准以下啊,所以它输出到的是哪里,是不是alert data里边啊,哎,所以大家这个看的很明显对不对,就分成了两条流,同样你后面如果要是给这个七的数据,当然它就会进入到alert这个流里面,对吧,你如果要再给这个十的数据的话,显然它就会到这个主主流里面来,对,所以大家可以把这个测试出流,按照我们这种方式再去好好测一测啊,这就是这部分内容,呃,那个process function这一块呢,就还有最后一部分给大家放在这里的,就是这个Co process function,这就给大家简单提一句就好,就是大家可能会想到它是不是就相当于是两个流connect连连接在一起之后,然后是不是可以通过这样的。
14:19
但方式给他去做统一处理啊,它是不是就有点像我们当时的那个Co map那样操作对不对,所以当时Co map我们不是得传两个函数进去吗?大家还记得吗?这这个大家有印象吧,对,所以现在抠process方式,它会是一个什么效果呢?它就要去实现两个方法,一个叫process element1,一个叫process element2,对,就是相当于就是我们说的那个,当时我们不也是传入两个那个方式,一个方式一,一个方式二吗?现在也是对吧,对应的一国两制嘛,哪个到底用哪个处理,这里边明明白白的在里边写就可以了。
15:01
这个就是我们后续在项目里面可能会用到这一部分内容,到时候再给大家详细讲啊,再看这个代码怎么写。
我来说两句