00:00
我们现在已经知道怎么样把一条流分成两条流了,哎,那自然我们就想到有分就有合嘛,啊,那怎么样能把两条流甚至多条流合成一条流呢?哎,那与之对应的啊,有一个操操作叫做连接两条流的操作叫做connect。呃,然后它呢,也是有一个配对的操作出现啊,它对应的那个操作叫connect,后面我们再解释啊,首先我们看这个connect connect是干了一件什么事呢?哎,它其实非常简单,就是这里有两个data stream,我现在做一个connect操作,直接把它合在一起得到的呢,大家看就是一个connected streams。就又得到了一个新的数据类型,对吧?啊,又得到了一个这样的,就是连接起来之后的这个两条流这样的一个结果啊,这看起来已经像是一条流了,但是其实大家知道这里边是不是还是两条流啊,啊,这只是连接在一起而已,所以后边如果我想要针对这样一个流在做处理做计算的话,那是不是应该再做一个转换操作啊。
01:05
所以大家看到啊,基于这个collected streams后边必须要再做一个,比方说做一个map或者flat map操作做一个转换,就可以真正意义上把这两条流合到一起。而这转换的这个过程,我要实现的这个操作就叫Co map,或者扣flat map。好,所以这其实就是这样的一个定义啊,啊,那对于这个连接两条流之后,它有一个特点就是。大家看到啊,这两条流一开始连接之后,是不是只是放在了一个connected streams里,它其实互相独立,各不影响,对吧?所以呢,之前我的这两条流可以数据类型完全不一样,然后它连接之后呢,放在这儿,数据类型也是可以完全不一样,不发生任何的变化。哎,那到什么时候才发生变化呢?就是后边对你既然要真正合成一条流嘛,那是不是接下来我就一定要做一个这个map或者Fla map操作的时候,就会把它的这个数据类型合成一个啊,那最终就得到了这样的一个同样的一个strip,而在做这个map Fla map操作的时候,大家想到因为他俩本身的数据类型都不一样。
02:21
那是不是定义的这个map操作得分别定义啊,哎,所以这个,呃,就是所谓的这个connect Co map啊,它的这个操作模式有点像一国两制。啊,就是你看我们前面这里边相当于只是就是形式上先统一在一起,然后接下来是不是你的数据类型可以不一样,我针对你不同的数据类型各做各的,最后我们变成了一个一个有一个国家对吧?诶,所以这跟这个一国两制是很像的啊里边的数据类型是可以不变的,好,那所以呃,这里边的这个数据转换的过程呢,又是这样,就是先基于一个data stream呢,调connect的方法,得到的是一个connected streams,然后再去调一个map或者flat map方法。
03:09
这里面得到得到的就是一个新的data stream了,好,所以接下来我们再把这个做一个测试啊,看看代码里面到底怎么写这个啊,这个我就基于前面来直接做操作了,因为刚好我是不是已经分流有了多条流啊啊,那所以接下来我们直接在这里边做这个河流啊。河流,呃,我现在用的是这个connect,也就是连接两条流的方式进行一个河流,那这里边我可可能会涉及到一个一个想法,就是说我到底要合合并谁呢?啊,比方说我要把这个高温流和低温流合在一起,那大家想这个你没必要合并啊,我我数据类型本来就一样,对吧?啊,就是数据类型本来一样,你之前那个拣选的时候,直接拣选这个二不就完事了吗?所以我现在呢,呃。接下来我要做的操作啊,我先将高温流高温流啊这个转换成呃,元组类型,元组类型。
04:13
就是而且我这个要求干脆这样啊,转换成一个二元组类型啊,我就是要当前的传感器的ID和它的那个温度值,然后接下来我要干什么呢?啊,要做一个雨这个低温流。呃,连接合并之后。之后啊输出啊,就是类似于相当于检测到这个状态之后输出一个状态信息对吧?那高温流我可以输出一个报警对吧?啊,那低温流的话可以输出它是正常的啊,所以这个整体来讲还是比较容易想到的一个操作,那首先我们要做一个是不是要做一个map转换啊,啊所以接下来我是一个呃,High time stream啊先做一个map,那这个map就非常简单,呃,那有同学想到我现在不是要做那个呃转换成二元组吗?那这个二元组大家就要注意了,你如果用拉姆达表达式的话,是不是会涉及到那个类型擦除,你后面还得专门再去指定类型啊,那何必那么麻烦呢?我干脆直接就来一个map function。
05:23
得了对吧?呃,这里大家要注意,因为我现在并没有定义它返回类型是什么,所以这里边它是一个object对吧,那就是我并不知道你要得到什么东西嘛,所以我在这呢,把它定义出来,我要一个temple。二元组这样的类型对吧?而且里边我要定义出它对应的那个泛型来,我现在要的是s ID string类型,还有温度值double类型,好把这个先引入,注意是Java的那个元组类型啊啊,然后接下来这个map function里边是不是必须要去实现一个map方法啊啊,那这个map方法就非常简单了,我就直接去new一个,呃,Temple two对吧,然后这里边给的这个。
06:10
我是不是直接用当前的这个value.get ID拿出来就完事了啊,另外还有这个value.temperature是不是也是拿出来就搞定了啊,这个非常简单。我把它单独定义一下,这个呢,呃,我改一个名字吧,比方说我这个叫高温嘛,要报警对吧,叫一个报警流,比方说叫warning stream啊,当然这里面大家如果不想用这个,呃,就是stream operator的话,我还是把它写成这个data stream啊,看的稍微舒服一点,它的类型是一个二元组,对吧。好,现在问题接下来就是要做这个真正的连接操作了啊,那连接操作就是基于这个warning stream直接是不是可以调一个connect方法呀,对,Connect方法里边传进来的就是low stream。那接下来啊,大家看到它的这个得到类型应该是什么呢。
07:03
这就是我们所说的connected streams对吧,大家看它的类型就叫做connected streams,然后这个ED streams啊,就是connect得到的这个类型,大家看它的泛型是有是有两个的INPUT1INPUT2,好,那问一下大家现在INPUT1是谁?哎,这个很明显二元组类型对吧,是不是谁调就是INPUT10INPUT1是谁啊啊这个很明显啊,呃,我们在这个。我把这个先关掉啊,在这个代码里边,大家看的也也很明显,就是调用的时候,本身当前的data stream类型是不是T,然后传进来的data stream类型是R,那大家看它的泛型是不是TR啊,所以就是谁调谁是一对吧,被调的那个参数它的类型是二,好有了这个之后。接下来这个connected streams啊,给大家看一眼它里边能调的方法。
08:02
啊,大家看啊,除了这一堆什么get各种各样,就是获取相关的信息的这些方法之外啊,能做的转换操作,哎,大家看有k back,就是当前我这个连接在一起之后,还可以去去分组对吧?啊大家看这K败也是是不是要指定两个啊,就是因为我是两条流嘛,两一国两制嘛,啊所以这个K也是可以指定两个,除了k map之外,大家看上面是不是有flat map呀,Flat map里边传的是一个Co flat map function对吧?啊,跟我们之前就不一样了,之前你是不是只传一个flat map function就可以了,现在传的是Co flat map function式,因为你现在是connected streams对吧,调用的这个不一样啊,然后除了Fla Fla map之外,还有map map要传的就是一个Co map function对吧?啊,所以主要就是它俩,另外还有底层的API有一个process。那这里边要传的就是一个抠process方式,之前我们不是说那个process方式API是底层API吗?啊,这个我们放到后面再讲啊,所以最常见的就是map或者flat map里边实现一个抠map function或者扣Fla map function。
09:13
所以呃,那是不是就可以去做一个操作了,Connected streams是不是直接可以点map里边需要去啊,接下来我是不是要new一个Co map function啊,大家看这里边,呃,定义的这个啊,输入INPUT12元组类型,对这个INPUT23READING类型,因为那个低温流我们没做过转换嘛,最后还有一个object object这个类型,这就要看我们最终到底要得到的数据类型是什么了,对吧,就要把它做一个统一了,比方说这个,呃也简单,我最终就干脆啊,呃,就是输出一个,呃,大家可以想到啊,我我可以比方说啊,有同学可能想,那那假如说我这里面要实现这个,呃,这个Co map function这个接口的话,里边到底要实现哪些方法呢?我们先看一眼啊,大家看里边是两个。
10:10
什么叫抠map方式呢?就是啊,连接起来的流里边有两个map一国两制对不对,它就叫MAP1MAP2,你看这个MAP1它的参数是什么?是不是就是第一条流里面的数据啊,MAP2是不是就是处理的是第二条流里的数据啊,啊,所以你看一国两制互不干扰对吧?各自处理各自的,最终是不是都要得到一个相同的object类型啊啊,当然这里边我也可以定义,就是说,呃,比方说啊,我现在要求的是,呃,我最终这个MAP1啊,它不是那个高温报警流吗?我最终要返回一个三元组,比方说你有一个TEMPLE3啊,然后接下来里边的这个类型呢?呃,我要的是当前的。大家看我现在如果取这个Y流的话,是不是就是F0F1啊,二元组里面的那个字段啊,本身那个属性叫的就是F0F1 F0就是当前的ID,然后F1是不是就是当前的温度值啊,我先把这两个放到放在这儿,然后另外我再来一个报警信息对吧?比方说我这个叫呃,高温报警,或者说我这是一个当前的状态对吧?高温报警状态,哎,那那这个是一个返回了一个三元组,那比方说下面这个呢,下面这个低温状态,我就不关心它温度到底是多少了,我就只关心它状态是正常就完事了,对吧?哎,所以那我这里边可以直接拗一个二元组t two啊,然后里边我取这个value.get ID,然后输出一个当前,它是normal的。
11:48
大家看这样是不是就可以啊,这里大家要注意啊,我外边给的是object,这是不是我当前这个二元组和三元组的一个公共负类啊,对吧?哎,那假如说我这里边直接要给一个呃,这个二元组这个类型,这可以吗?这这肯定报错对不对啊,但当然后边这里边。
12:10
我先把这个先copy一下啊,我们把这个删掉,然后假如说我这里边给一个二元组类型,我要给一个string类型,String string类型,然后里边实现的这个东西,大家看是不是返回,你就必须得是这样一个二元组类型啊,一国两制,最后必须要还是回归到这个一个国家上,对吧,最后还是同一条流,那你的类型不能变嘛。那如果说你出现像刚才我们这个类型不一样的这种这种场景的话,那是不是就必须要取它的一个公共负类啊,对吧?你这里边给一个object啊,这里是完全可以的啊,所以我们这里边就做了这样的一个操作,那如果要是你写那个scale代码的话,是不是也可以啊,大家知道scale里边所有的数据类型其实都有一个公共负类,是any对吧?啊,就是引用类型的话,Any reference啊,Any ref,或者是那个值类型的话,Any value any v l对吧?啊,这个其实最终都是any类型,跟我们这个Java这里边是差不多的啊,所以转换之后得到的这就是最终的一个啊,比方说我这个叫result stream对吧?哎,这就是我们得到这个结果了。
13:23
你也可以把上面这个改成data stream。我们可以把这个result stream做一个打印输出,看一下结果。运行一下吧,看看效果怎么样。好,我们看一下这个运行结果,哎,大家看是不是符合啊,当然前面我们还有那个分流之后的那个输出啊,我们只看后面就可以了,后面大家看是不是如果是高温的话,就输出了这样一个三元组啊,然后做了这样一个输出,对吧?呃,Hightime Bo,如果是低温的话,是不是直接就是normal啊,哎,所以这就是我们想要的这个结果啊,河流之后做的这样一个操作,一个结果。
我来说两句