00:00
那接下来我们已经讲完了基本的转换计算啊,也讲完了这个稍微复杂一点的聚合计算啊,那其实我们看一般我们做大数据不就是map reduce,差不多就是这这两种操作嘛,是吧?啊就是要不就是你只跟当前的这个数据有关,做一个简单转换,要不就是跟之前的某些数据和这个状态有关,做一个聚合,做一个统计,那另外还能涉及到什么操作呢?接下来我们要做的这些操作,这又可以归成一大类,大家看这个,呃,第七节到第九节啊。那这三节其实都操作的是多条流,所以有时候我们往往会把它们总结起来叫做多流转换算子,哎,那么这个多流转换算子里边,首先我们看到的是split和selectt,哎,那它俩是一对啊,这是一组操作,那首先我们看split split什么意思?切分嘛,切割啊,我们之前那个按照逗号分割字段啊,啊,把这个字段拆分出来,按照空格分词不都用了它吗?呃,方法名叫split啊,那所以这里边的split也非常简单,就是把一个瘤要切开,切成两条瘤,这里大家要稍微注意一下,用SP方法啊,基于一个data stream调它的Li方法切开之后,它是直接得到了两条流吗?
01:26
当然不能这么简单,因为大家想,你如果要直接得到两条流的话,那难道说我返回的这个数据类型是两个data stream吗?没听说过这样的这个状态是吧?啊,难道是返回一个data stream的原组吗?不可思议,这,这是什么东西啊?所以大家注意啊,调用split之后得到的数据类型是一个。Split stream是一个新的这样的一个数据类型,这有点像我们之前那个k stream一样,对吧?啊,这里边叫做split stream,所以直观上理解的话,这就是一个切分流。那其实他名义上是把一条流拆成了两个,但事实上大家看这个sweet swim是不是还是一条流啊,啊对吧?呃,所以这里边我们这个s space操作到底是干了一件什么事呢。
02:13
它简单来讲,其实就是按照一定的特征把这个数据做一个做一个划分,然后给它相当于盖一个戳,盖一个就是相当于拣选的标志一样,对吧,就我当前还是放在同一个流里边,但是我已经根据它不同的特点盖了不同的戳了。那接下来下一步怎么办?那是不是就是对再根据那个戳做一个拣选,是不是就可以得到对应的不同的流啊,哎,所以大家看就是你做完split之后,一定后边是要跟上一个select,这才是一个完整的分流操作。啊,那所以接下来基于这个speed,呃,Stream啊,调一个s select方法,然后你根据不同的戳去做提取,那是不是就可以得到不同的data stream呀,啊,这就是一个完整的流程啊,啊,那接下来我们来实现一个具体的需求吧,比方说当前我们这个数据流里边温度有高有低,有不同的温度值,那比方说我现在呢,要做一个切分,切分成两条流,就是高温流和低温流。
03:20
啊,那大家想我现在就是有一个阈值啊,以30度为界,大于30度的是高温,低于30度的是低温,那家想是不是相当于我做这个split拆这个比的时候啊,盖戳的时候条件是不是就是判断它的温度值啊,那所以这个其实整体时间还是比较简单的啊,那所以接下来我们在代码里边做一个测试。呃,这个transform我们就多流转换啊,这个我们就直接放在同一个测试文件里面就好了,Transform test4。好,然后接下来我们这个就是多流转换啊呃,Multiply streams。
04:03
Multiply。呃呃,这个就叫multiple吧,啊multiple streams,然后接下来,首先前面的内容跟之前还是大同小异,对吧,这里面还是啊,从文件里边读取数据,先读过来。诶,我把我把这里边要做的这个操作啊,整个流程先定义好啊,那读取文件数据之后,后边是不是还要转换成一个sensor reading这个po类啊,后边我们做那个分组做做这个判断都都容易一点啊,啊当然现现在我们好像不用做分组了,大家想现在要做分组吗?其实不用对吧,你说这个高温低温,我我是不是只针对当前数据做一个判断,然后做一个这个概戳,然后拣选就完事了,是不是跟这个哪个组别没关系啊啊对,所以这个其实还是比较简单的啊。
05:02
后边接下来我们就做一个分流操作,呃,我们现在要的是按照温度值。30度为界。分为两条流,所以接下来其实就是data stream要去做一个split,然后split里面到底是要实现什么方法呢?大家看这里实现的是一个output select,对,一个选择器对吧?啊,就是相当于我们不是说要盖戳嘛,那是不是就是你得定义出来符合什么条件,然后盖什么戳对吧?啊,所以这个东西叫做一个选择器啊,你有一个output select,然后大家看这个output select是不是只有一个泛型啊。就是当前。哎,就是相当于我当前这个输出的这样的一个类型对吧?哎,那所以这里边我的这个类型是不是就还是sensor reading啊,啊就是我当前这个SPSP,大家看啊,得到的这个类型是一个split stream,它的泛型T是不是完全不变,跟我们当前的这个类型是完全一样的啊啊所以都是s reading这个类型不变,但是呢,大家看这里边是需要实现一个select的方法啊,这里边有一个select的方法,这个select方法传进来的是sensor reading,这是不是就是每一条数据啊,啊,每个数据来了之后都会掉这个select,那我们说它要盖一个戳嘛,盖一个戳它返回什么呢?
06:35
诶,返回的是一个可string类型的可迭代这样的一个集合类型,对吧,Able类型,那所以为什么它是一个集合interable类型呢。也就是说我这里面如果给string的话,是不是可以给多个啊,哎,那指的这指的是什么。哎,就是你当前做拣选嘛,盖戳嘛啊,你说一个数据他只能有一个戳嘛,未必对吧?啊,所以比方说我们这里边啊,每一个同学啊,一个同学来了之后,他身上这个初二,那就相当于标签一样嘛,我可以划分一下诶这个同学这是一个年轻人对吧?一个标签儿啊,大学生一个标签啊,对吧,有钱人有一个标签啊,所以后面我们做筛选的时候,是不是也可以组合不同的标签去做一些筛选啊,所以这个其实是很灵活的一个分组方式啊,就不一定一定是来了这个分到一个组里边,那个又是另外一个组,对吧?啊,所以这只是一个设置标签和后边做检选标签的一个过程啊,就相当于关键词一样。
07:37
呃,那这里面我就简单一些了啊,直接return啊呃,那那这里边我们是不是要根据这个温度值不同,返回的这个标签也应该不一样啊,所以接下来我是要判断当前的value怎么样,是不是要get temperature啊,我得看当前它的这个,呃,就是温度值是否大于30度,诶,呃,那大想我现在如果大于30度怎么办?
08:06
大于30度,那是不是直接就哎给一个高温流的标签对吧,比方说那个标签我就叫氦,那氦的话,我是不是还还得包装成一个一特玻类型啊,我干脆生成一个list完完事了对吧?啊,我直接这个啊,你可以用那个,呃,AR at least,或者说我当天只有一个元素,那是不是可以,这个大家知道有一个是不是collections,直接点是不是single ten list就可以啊,一个元素嘛,对吧,单一元素的例子啊嗯,那所以这里边我给一个一个string类型氦对吧,高温流啊,那与是对应,如果要是小于等于30度的话,我直接给一个低温流还是只有一个元素的list,这个叫漏,大家看是不是这样就完事了。啊,这就是做这个拆分这样一个过程啊,这这是盖戳这样一个过程啊,它得到的是一个。
09:02
是不是得得到的是一个spli stream呀啊,对吧,是这样的一个类型,然后spli stream,这里边大家看能调的方法是不是就只有select呀啊,当然这里边大家看到还有一个select output,这是一个private方法了,对吧,底层调用的方法,所以它真正能调的就一个select,然后传参传什么。哎,就是所有的那个name对吧,就是我们的那个定义的标签对不对,可以传多个,哎,那家想是不是就可以进行那个标签组合啊,对吧,我可以按照这个各种不同的标签啊组合起来去拣选对应的想要的那条流啊。所以这里边其实整体来讲还是非常简单的啊呃呃,那我们接下来其实就是要做一个split split stream,做一个点select里边我去拣选当前的氦这个高温流啊,那其实大家知道最后得到的是不是还是一个这这个调用之后大家看啊,调用它的那个select之后,最后得到的是不是还是一个data stream呀啊,所以这就是先呃做split得到一个split stream,然后再selectt,最后是不是又回来了啊,所以这就是绕一圈再回来了,这个我就叫做high。
10:21
呃,Hi。Temp STEM,然后同样选取那个低温流是不是也一样啊,哎,这里边我就定义一个这个叫low,同样这里我给一个low temp temp stream,那另外这里面我还可以给多个标签嘛,那大家想我直接要是比方说这个high low全提取的话。那我这提取出来应该是个啥呀?啊,那其实就相当于是所有的对不对啊,所以就是我这个就叫做all time stream对吧?啊那所以接下来我可以把这三个流分别做一个输出,比方说这个叫嗨,呃,这给一个不同的流嘛,我给一个这个标志啊啊那另外这个low time stream来一个low。
11:10
然后对应all time,来一个all。啊,最后不要忘记,因为XQ执行起来对吧?哎,这就是我们做这个分流的操作,我们先做一个测试啊,运行一下看看结果怎么样。哦,在这个里面其实有同学可能注意到了啊,这个sweet的方法对大家看过时了,Dere对吧,要被弃用了,那这个如果要它被弃用的话,那我们用什么来替代呢?要分流的话用什么替代呢?大家看一下用side output instead。也就是说要用这个有时候翻译叫测输输流对吧?啊,就是side output啊,那这个侧输出流又是个什么东西呢?哎,这是要用到所谓的弗林克底层的这个process方式API。
12:03
三个层级,三层API里面的底层API对吧,它里边可以调这个side output啊,所以这个我们放到后面再给大家讲啊,啊那这里面就是你要是上层的API的话,Split呢,相对来讲还比较好用,所以还是给大家讲一下啊,简单实现还是比较容易的,那大家看一下当前我们输出的结果,大家看输出结果很多。是不是每条数据都有两个输出啊?哎,为什么有两个输出,哎,对,因为不管是高还是低,是不是它一定都会在二里边输出一次啊,啊所以大家具体看一下啊,35.8大于30度是不是氦和R都有输出,那15.4小于30度漏和R都有输出,对吧?后边对应的也是啊,小于30度的就是低温和二,那大于30度的就是高温和二,对吧?哎,这就是我们最后输出的分流之后输出的这样的一个结果。
我来说两句