00:00
我们已经知道了,可以把一条流分成两条流,那当然与之对应的有分就有合,当然可以把两条流再合成一条流,对吧,甚至多条流合成一条流,那这里边有两种不同的河流操作,一种叫做connect,另外一种叫做UN,所以接下来我们来给大家讲这个河流啊,啊,那大家可能会会觉得有点奇怪,就是这两个区别又在哪里呢?哎,不要着急,我们一个一个来看啊,首先看connect connect的特点是什么呢?它的特点可以说就是刚好跟这个split可以说是对应上了,大家看它的这个河流是什么操作呢?相当于就是说。做了这个connect之后,我得到的是把两个data stream得合成了一个叫做connected streams。那就得到这样的一个数据结构,然后它其实就是什么呢?形式上把两条流放到了一条流里边。但是本质上你看它还是各自是各自对不对啊,这个他们的数据类型甚至都可以不一样啊,所以说这个整个的这个过程有点像什么呢?啊,就是有点像一国两制的这种状态一样,对吧?哎,我们本身当前的这个状态呢,大家都是同一种制度啊,都都是同一个国家,都已经是放到了同样的一条流里边,真正大家是一个整体了,一条流了,但是呢,里边具体的操作,具体的自己的特质,你可以保持独立,相对独立啊,就是这里边你可以按照自己的数据类型啊去去做处理,去做定义,呃,大家看到这个各自的数据和形式可以不发生任何的变化。
01:40
啊,那大家就会想到,那这个就可能会有问题了,那假如说接下来你这两两个就是同一个流里边,它这个数据结构都不一样,那我定义它的操作的时候,这玩意儿怎么样去定义呢。其实也比较简单,那就是接下来的这个connected streams,它有自己特殊的一些API可以去做操作,这种它特殊的这个操作是什么呢?哎,就是叫做Co map,或者叫Co flat map。
02:09
它的这个API调用大家注意啊,并不是说呃,得到这个connected streams呢,然后直接调一个叫做Co map方法,不是啊,它其实直接就是基于这个collective streams,然后调map方法,调一个map方法,这个操作跟我们平常的map不一样,为什么呢?因为他知道现在我里边其实是一国两制的状态,对吧?那所以当前的这个Co map相当于干什么呢?是要传两个map function,就是对应的那个操作的函数,然后针对两个我们当前合并的这两个流里边的元素分别去做处理。诶,所以大家看这是真正意义上的一国两制对吧,就是connect先我们先大前提是合成一国,然后接下来再去两制,两制的话就你做一个扣map或者扣Fla,扣flat map这样的操作就可以了,好啊,那具体这个转换的过程当中,做了这个扣map,扣Fla map之后呢,得到的就又是一个统一的data stream了,那这个data stream里边呢,数据类型就他们的数据类型可以。
03:15
转换之后相同也可以不相同,哎,为什么呢?因为不相同的话,他们至少可以都是any value对吧?啊,这可以都是,至少可以都是any嘛,这个肯定是没有问题的,所以最终我们是合成了一个data string。啊,那所以接下来我们还是在代码里边给大家做一个。做一个实现啊,那上面这一个分流我们就不要再打印输出了,接下来是轮到了第4.2,我们当前是合流操作,现在是connect啊,啊那两条流如果要做这个connect操作的话,我们说它一国两制嘛,两条流里边的数据结构可以不一样,数据类型可以不一样,所以接下来我首先是先把这个数据类型做一个调整,对吧?比方说之前这个高温,假如说我高温想要报警。
04:06
啊,那我就单独再定义一个报警的这个流啊,Warning stream,它基于之前的这个高温流去做一个简单的转换,之前我们的高温流低温流这个数据类型都一样嘛,都是senor reading,所以说现在我要把调成不一样的,我就非常简单的把它转化成一个,比方说我转化成一个,呃,二二元组,对吧,这个非常简单,就是我要报警的话,我关心的可能只是关心当前的ID,哪个传感器,以及当前的那个温度值到底是多少,哎,那个时间可能我不关心,我就当前这个温这个。传感器直接报警就就好了,那我转换成一个二元组类型,然后接下来大家知道这个就跟低温流不一样了,对吧?哎,我再把这两条流做一个做一个连接,做做一个合并,那比方说我定义一个connected streams啊,这里边我就直接用warning stream.connect大家看这是基于data stream的一个方法connect,然后呢,去连接另外一条流low temp stream,大家看到这里边你传的就是一个data stream,对吧?但而且这里边类型可以不一样,可以是T2啊,本来我们当前的数据类型是T对吧,当前流里边是T,然后你传一个T2,得到的呢是一个connected streams,它的这个泛型是两个T和T2,这个就分别表示INPUT1 input2,然后在这个connected streams里边呢,大家看到啊,有map方法。
05:38
有flat map方法啊,甚至还可以再去做KB,对吧,基于当前这个已经联合在一起的这个呃,流啊,连连接起来的这个流,再去做分组,再去K,这个也是可以的。那我们最常见的呢,就是map flat map,它这个操作做完了之后,得到的就又是一个data stream了啊,这个就比较简单对吧?啊,然后这里边你可以传什么参呢?大家注意这个map比较特殊,传两个参数对吧?啊,传一个FUNCTION1,传一个FUNCTION2 function1操作INPUT1数据类型做转换,FUNCTION2操作INPUT2数据类型做转换,最后转换出来的。
06:20
都是相同的一个R类型,对吧?哎,这就是我们经过转换之后的map map之后的这个结果啊,那另外还有一种方式,大家知道可以传一个传一个函数类,那那这个类大家看它就叫做为什么我们管这个操作叫做Co map呢?这里边的函数类传的这个名字就叫做Co map方式,你要实现的是一个Co map方式,然后它的类型有三个,我们我们点进去看一下,这就是INPUT1INPUT2OUT对吧,Output。然后里边要实现的方法呢,大家看一个MAP1,一个MAP2,这不是非常非常直白吗?对吧,就是第一条流里边的那个数据类型来做转换,转换成一个out类型,然后第二个呃,那个流里边的数据类型INPUT2啊,再转换成一个out类型,分别用两个函数来实现这个map转换啊,所以这就是我们所谓的一国两制。
07:17
当然扣flat map也是类似的啊,它要实现的就是一个扣flat map function对吧?啊,这个大家一想就能想到,我们看这个flat map啊。你看这里边要实现的就是一个扣flat map方式啊,那同样这里边去就是flat map flat map1 flat map2对吧?啊,然后大家看后面那个,你要做那个数据输出的时候,用到了一个collector对吧,叫做al的一个collector去收集数据,然后输出啊这个大家就是后面用到的时候再再看就可以了啊,我们现在主要是。给大家看一看,接下来怎么样去做这个一国两制的处理呢?哎,我们呃用Co map进行。
08:00
呃,对数据进行分别处理,其实就是这样的一个过程啊,所以这里边我定义一个,呃,这个result stream啊,当然这个result STEM之前已经定义过了,对吧,我们把这个叫做这个Co map result stream。然后它需要在之前的connected stream这个基础上直接去调用一个map方法里边其实就是一个com function对吧,我也可以传两个参数,传两个函数进去分别表示,哎,就是第一个流里边的数据怎么处理,第二个流里的数据怎么处理啊,所以接下来我们这是两个函数啊,我就用这个拉姆达表达式先写吧,这个比较简单啊,啊,那首先第一个流里边我们知道这是那个warning对吧,Warning data,我把这个定义出来,他要怎么做操作呢?我们还是放在下边啊,换行。这个比较简单,比方说我要的就是在当前的这个数据基础上啊,当前的那个ID。
09:05
呃,Warning data_一对吧,就是就是当前它的这个数据,然后warning data_二,就是当前它的那个温度值temperature,然后我再加上一个,比方说一个warning这样一个报警信息,对吧?啊,这个比较简单啊,就是给大家简单化了,做一个示例就好,为什么报错呢?因为你还有第二个参数没搞定呢啊,还要传一个比方说这个我们叫low temp data。然后写一个蓝表达式,那这里边比方说我们还是啊,这个就不用写那么复杂了,我直接就把它ID拿出来,然后去来一个也是文字说明,我说它是正常的对吧,比方说我叫healthy,诶健康当前的这个传感器没毛病,诶这样的话就可以输出了,大家看我得到的这个类型可以不一样,对吧?那你要求不是最后得到的R必须是同样的类型吗?诶没关系,你这里边看的话,一个是二元组,一个是三元组,你如果看一下类型的话,它是不是就相当于是一个product类型呀。
10:05
对吧,啊,就是我们整个元组大家知道都是统一的一个这个product类型嘛,啊,所以这里边它其实就是找它的副类就就OK了,甚至你还可以怎么样,你这里边定义的时候,你把它直接定义成一个data streamam any,这是不是更是大招啊,对吧?你直接定义一个data streamam,你这肯定没毛病啊,所以这个就是大家对于数据类型啊,不要认为它有什么奇怪的这种这种用法啊,那最后我们可以把当前的这个结果做一个打印输出啊,比方说这个我们叫Co map,对吧。得到这样一个结果啊,这这里我们可以看一看啊。好,我们看一下这个执行结果,哎,大家看这就是这个Co LA输出的结果,哎,你看之前的高温流warning对吧,低温流healthy啊,高温流warning,低温流healthy这个一点问题都没有对吧?就按照我们分流出来那个结果分别做转换,这是连接到一条流打印输出的。
11:01
啊,但是有同学可能说你这个有什么意义呢,这没什么意义,你之前已经本来就是一条流分开的嘛,现在又合在一起啊,只是给大家做这个API的调用做一些讲解,对吧?你如果真正的要用这个河流操作,大家想想能用在什么场景呢?呃,你比方说像传感器,那大家想在有一些场景里边,我们要去监测火火情对吧,去去做那个火警的那个报警,那对于这个火情的这个检测,你如果只检测温度的话,这个合理吗?那其实。不见得合理,对不对,哎,你这里边有些地方可能太阳直射啊,或者说有些地方它那个温度突然就是就是特别高了,但它并没有火情啊,并没有着火,你如果要是正常的话,过一过一会儿,它这个温度降低,自然冷却就没事了。但是什么情况下就真的要报警呢?我们与之配合的还有一个烟雾报警器,假如说我检测到温度又高,哎,达到了我们认为很多易燃物的这个燃烧的临界点,然后同时呢,又有这个烟雾,烟雾报警线,那个数值又比较高,这个时候大家想是不是我把它俩做一个collect的联合对吧,连接起来,然后同时检测到当前他们的数据啊,呃,当前的这个数据,如果说有些已已已经是这个温度又高,然后又又已经是这个,呃,烟雾传感器,烟雾浓度也比较高,那是不是这个时候就应该真的做一个报警了啊,所以往往一般是在这种啊,就一些风险控制做报警的时候,这个用的比较多一点。
12:31
啊,那呃,有同学可能会对这个比较有疑惑,就是说对于多流转换的时候,两条流在我们那个slot上,任务执行的时候又是一个怎么样的情景呢?那比方说这里边我们的这个河流啊,我在这里边给大家。直接就看一下这个这个呃,整个转换的这个过程啊,那大家想前边我这里边首先有一个基于这个high time stream,首先大家看啊,两条流的话,首先我这是两条流对不对,两条流那是不是输入它的这个任务肯定是不同的任务啊啊所以比方说这里边我这里边是这个high啊。
13:08
前面我们已经分流过了,下面这个是漏。这是这两条流,这里不是算子啊,大家可以认为这就是氦的那个S算子对吧?或者说这是漏的那个S算子,或者说我们认为这是这个温度传感器的S读取数据源,这是那个烟雾传感器的S读取数据源,那大家想这肯定是两个不同的任务嘛,呃,这是这是完全分开的啊,然后接下来这个氦会怎么样呢?Hi,可能去直接做一个map转换,得到当前的这个warning啊,然后这个low不做转换了,然后接下来他俩干什么事呢?做一个connect对吧?做一个connect的操作,呃,然后接下来我们在这里边。Connect起来之后再做一个Co map。啊,所以大家看到这个所谓的这个呃,Connect啊,Connect的这个操作,它本身呢,也并不是一个严格意义上的,呃,一个一个完整的操作,它需要还是一对操作啊,Connect之后Co map,这才是一个完整的操作。
14:10
所以我们在执行的时候,这个相当于是什么呢?就上边这个大家看跟下边这个河流之前各干各的,并行不悖对吧,这就相当于是两条流,它当然是并行的,当然应该是在不同的slot里边去执行,对吧?啊,那那当然有些我们说这个不同的任务啊,它可以放在同样的一个slot里边去做共享嘛,啊这个当然是也有可能会放在同一个slot里边共享的也是有可能的啊,但是整体来讲,它就是不同的任务,你就把它理解成。这个前后发生的不同的任务就完事了啊,这个其实还是比较好理解的啊,那之后怎么样呢?到了Co map这一步,他俩人就真的合成了一个任务了。啊,那与之对应的,大家可能想我可能在这里边还有并行对吧,那com map可能有好几个这个并行度,哎,那这个就是你来了的数据,有些河流可能合合到这个任务里边了,有些河就可能合到这儿来了,对吧,有些合到这儿来,这就并行去做计算就行了,同样上边也是你这个氦,这个读取数据源的时候,可能有很多个并行任务,这个漏也可能有很多个并行任务,都是各自处理各自的就完事。
15:17
其实处理起来都是一样,对吧,只是在这个拓扑图里边可能会比较复杂一点而已。好,这是关于这个connect河流的操作,所以大家总结一下的话,会发现我们这里边其实这个数据结构是什么呢?前面讲到的这些都是成对出现的,比方说我们前面讲到的这个,呃,简单的聚合,那就是先K对吧,KBY之后得到一个K的stream,然后再做聚合,聚合转换又回到了data stream。又又回到了这个data stream这里边的这个状态,对吧,绕一圈回来,然后如果说呃,后面我们做分流的话,先split split得到一个split stream,然后呢,哎,再做这个select转换,再回到这个data stream,对吧?哎,又是这个绕一圈又回来。
16:06
那同样后边我们这个connect Co map也是乘坐出现,先是两条流去做connected,呃,得到一个connected streams,然后呢,做Co map Co Fla,再得到data stream,啊注意当然不是多条多个data stream了啊,就变成一个data stream了,就是这样的一个转换操作。啊,这就是,呃,最终我们还是data stream作为基准,对吧,绕一圈又回来啊,出走半生归来仍是仍是data stream啊,所以我们就所有的这些操作都叫做data stream API还是基于它的,这是关于这个整个数据类型转换的一个操作啊,然后我们还涉及到一个多流转换,其实是。这个union union的话,大家其实发现它字面上来理解的话,其实就是联合,联合这个操作大家看它没有匹配的另外一个操作了,为什么呢?因为它就是两个data stream直接合并联合,联合成一个data stream。
17:05
就是直接什么都不变对吧,直接就得到一个data stream,所以这个就比较简单一些,那那大家觉得你要这么说的话,Union就更好用啊,啊其实也不是因为它有限制,既然简单限制就比较多,它要求我们的两个数据,呃,这个流啊,Data stream必须是相同的数据类型。那所以前面我们那个一国两制,你什么数据类型都行,对吧,它的那个操作特别的灵活,特别的丰富,而这里边呢,就比较局限,你必须是同样数据类型,然后一九你就合起来了。啊,这个大家在代码里边简单的做一个操作就可以做一个测试啊,比方说这里边4.3啊,就是UNI河流对吧?呃,就是做一个联合,那我们会发现,假如说在这里边我们想要把两个啊,我定义一个这个union stream对吧。
18:01
我想把前面的两个流要合起来,假如我用这个warning stream,要想去直接UN这个low tap stream的话,大家看这里面报错了。为什么报错,哎,就是你这类型不匹配对吧?呃,漏这里边它是s reading,而这个warning这里边是一个二元组。但是你如果要是用前面的这个high to swim去UN它的话,这就没有任何问题。哈,所以呃,这就是这个union的这个方式啊,其实还是非常简单的,然后就就合成一个了啊,那我们可以对比一下这个union和connect啊,啊就是首先union使用比较简单对吧?啊,就是所有的这个data stream,直接1UNION,只要数据类型一样,合在一起就完事了,不用做进一步的转换操作,而connect呢,Connect的这个操作的时候比较灵活,它是两个不同类型的流,也可以连连接,连接起来之后做Co map Co Fla map,然后再得到一个get three,这个比较复杂一点啊,那所以这个一个是比较简单,另外一个是比较灵活,对吧,这就看你具体的这个需要嘛,然后另外还有一个区别,就是大家可能知道union这里边,你看这里边调用啊,Union的参数可以是多个。
19:15
可以传多个零,因为它要求这个数据类型必须一样嘛,那你想这是不是就是你有几个来几个对吧,你这里边再跟一个这个all time stream,这个完全没问题,你直接跟在后边全应用起来都可以啊,多条流都是可以直接操作的,但是connect你可以可以直接connect多条流吗?那肯定不行,因为之前我们看到过这里边的这个connected streams定义的泛型就只有两个,对吧?所以只能是两条流连接,你要想连接第三条流,那只能是扣map操作,转换完之后再去点connect连接第三条。啊,所以他们之间是有这样的一个差别,大家要注意一下。
我来说两句