00:00
接下来我们要做的这部分讲解,就是所谓的多流转换算子了,那首先呢,呃,大家知道多流转换分为这个分流和合流,对吧,首先我们来讲这个分流的操作,分流操作的话,哎,那这里边主要就是split了。咱从字面上也非常理解,Li不就是切割切分嘛,所以它本身做的操作就是把当前的一条流直接切开,切成两个部分,但是大家要注意啊,这个split并没有真正意义上把一个流。切成两个流,而是什么呢?大家注意,它是做了一个流类型的转化,本来我们是一个data stream,它经过split切分之后得到的是一个split stream得到的是这样的一个数据结构。所以理论上来讲,它并没有真正的把这个data string拆开,而是相当于我做了一个划分,对吧,就还是在一个流里边,只不过我已经分类了,你们这些哎,属于这个组,你们这些属于这个组,对吧?我们一个班分成两个组了,类似于这样一个操作,那之后怎么样才能把这个一个流分成两个流呢?得到两条data stream呢?
01:11
那就要结合另外一个操作,就是所谓的select啊,大家知道select就是所谓的这个拣选,对吧,选择拣选这样的操作,它就有点像什么呢?呃,就是之前我们这里边就相当于只是给它在当前的流里边盖了个戳,诶告诉你好,我当前这个数据应该属于这个流对吧?呃,应该属于这一类,这个组,这个呃,另外一个数据呢,属于另外一个组,但是呢,没拣选出来,接下来select就是根据之前盖的那个戳,把我们想要的数据分别按照戳拣选出来,变成一个真正的。啊,所以它俩一般都是配合使用的啊。这个过程其实呃,在有一些场景里边分流操作还是非常有必要的啊,比方说就是像我们这个卡夫卡数据源进来,对吧,你在部门里边操作的时候呢,有些数据可能是就是我们那个卡,卡夫卡里面的数据可能是没有经过任何处理啊,原始数据日志里面提取出来,直接就放在同一个topic里了,那你说这个时候是不是我们应该先做一个预分流的处理啊,对吧?不同的部门我这边拣选出我想要的来之后,那别的部门还要用继续用别的数据啊,呃,所以这个时候我可以进行一个Li操作,分成不同的流,我们自己该消费的数据直接把它消费出来,然后呢?呃,别的留的那个数据呢,我再重新写到卡卡里边,这样的话,别的部门就可以继续消费继续用了,对吧,所以呃,这里边就是就是Li和select,他俩就是总是成双成对的出现。
02:48
这里边与之对应的是什么呢?你要跟前面来对应就是什么,就是KY之后大家注意啊,它总是要结合着一个聚合算子,成双成对的出现一般情况啊,啊,但是后面我们还讲到那个window操作的时候又麻烦一点,就一般情况就是KBY之后总要接下来做一个聚合操作,那所以呃,前面我们说KBY之后得到的是一个kid stream,那k stream里边做完聚合之后得到的又是什么呢?大家看一下这个some。
03:18
得到是不是又变成了data stream呀,啊,绕一圈绕回来了嘛,所以现在我们做的这个split select其实也是一样,Split之后得到的是split stream,然后select之后呢,又回到了data stream对吧,拣选出来了。所以接下来我们还是在代码里边具体给大家做一个实现,现在我们呢,呃,做一个具体的需求啊,这个需求就是传感器的温度数据,按照温度值的高低,比方说30度为为界,拆分成两个流,一个30度以上高温流,对吧,30度以下低温流,大家来看看这个需求怎么实现。如果用sweet select的话,其实非常简单啊,接下来给大家来实现一下啊,我们把前面还是给大家归一下类吧,比方说这个这个是零啊,这个叫读取数据,然后接下来我们第一步是转换样例类,第二步做这个分组聚合,这就相当于转换里边就是一些简单转换操作,对吧?Map操作简单。
04:16
转换操作,然后后边我们这个第二部分啊,这就是分组聚合,比较简单的这个聚合方式,第三部分呢,就是更一般化的reduce这种聚合方式,对吧,那接下来我们讲第四部分,这就是多流转换。多流转换操作,那首先4.1是分流操作对吧,分流。将。传感器数据。温度数据,呃,分成低温高温两个流,两条流,好,那接下来我们的做法,首先定一个sweet stream对吧?好,那我们要做的就是之前的data stream,还是基于data stream啊,不要做聚合对吧,直接调一个sweet方法,然后哎,大家会看到这里边要传sweet这个方法里边要传什么东西呢?要传的是一个output select。
05:19
哎,这又是一个类对吧?又得实现一个自定义的类了,这看起来有点麻烦,那有没有拉姆德表达式的写法呢?哎,有的啊啊,大家看这个其实要实现的是就是这么一个函数,这个函数就是传入,哎,这是当前这个数据类型,对吧?Data stream里面的数据类型得到的是一个什么呢?输出的数据类型是一个travers ones的一个string啊,就是string类型的一个travers ones,一个集合类型。大家知道传斯bos就是可遍利一次啊,这个这种集合类型其实是比较呃,比较顶层的这种这种集合的副类对吧?啊,大家知道这个顶层的副类是这个,呃,传ver嘛,然后下面有这个it inter,呃,Able对吧,下面再拆分开是seek set set和map,而我们这里边的这个traversible ones啊,大家会知道它其实是什么呀,就是我们所谓的那个traversible,这里面给大家看一眼啊,这个继承关系。
06:20
Traverible它本身实现的这个treat里边有一个它继承字就是traverible ones对吧?哎,这里边它是有继承关系的,所以这其实是一个非常顶层的一个,呃,这样的一个几何类型了,那我们这里边怎么去实现呢?你要想实现它的话很简单,你随便返回一个一个list对吧,或者返回一个这个序列S,呃,返回一个set什么都行,然后呢,里边的元素是一个string。那这个string不就是我们说的那个戳嘛,你要盖什么戳对吧?啊,这个其实就是非常简单的一个一个实现啊,那接下来我们就用这个拉姆达表达式的这种方式给大家做一下,那我这里边怎么样去判断呢?哎,那按照温度高低嘛,当前贝塔的temperature,诶temperature啊,如果要是大于30度的话,哎,那就直接返回一个高温流,那这个高温流怎么办?来一个氦这个字段表示它的这个戳,然后我返回一个,哎,返回一个list的类型,或者返回一个这个seek这样的一个类型都可以,对吧,里边直接给一个hi,哎,就就这样就OK。
07:30
因为大家知道这个sick本身就就本身的这个SC里边定义的这个sick啊,啊,它底层就是我我们这里边去做这个处理的时候,它不也是一个传播吗?对吧,也是一个传播这样的一个类型,那那你在去做这个底层调用的时候,当然是可以实现的啊,这个当然是没有任何问题的啊,所以这是scla里边的这个集合类型的一些继承关系啊啊,那else的话,那同样我定义一个seek低温流漏对吧,大家看你这if else分支都写完之后不报错了。
08:06
那后面大家注意啊,这个得到的是个什么呢?得到的是一个split stream对吧?那么这个split stream这里边只有一个方法可以调用,就叫做select,然后调用select之后就又得到了data stream啊,所以接下来我们要做的操作也非常简单,那就是直接split点啊点select,这里边select传什么呢?传一个string,这不就是那个标签吗?啊,所以这里边high对吧?把这个高温流提取出来,得到这个data stream呢?我们把它叫做high pump stream高温流定义好。啊,那同样我们可以另外定义一个低温流low time stream split stream,然后同样select这里边拣选的是漏,对吧?啊,甚至大家看到这个select里边我可以传多个签标签,多个这个字符串,所以我甚至可以把两个不同的标签都选取出来,那大家想这高温流低温流都选取,那这不就是所有的温度值吗?对吧,我来一个all temp stream,这个拣选的话,定义在这里,Select里边就是high和low分别都可以传进去,哎,这都是可以去做的,对吧?啊,那这里边我就把这个先注掉啊,然后我们这里边做一个这个打印输出,打印输出的时候。
09:36
我把这个result放在前面吧,啊,大家知道这是上面的result啊。然后这里边我们做这个打印输出的时候,分别来看一眼啊,那就是high time time time streamam,然后print输出啊,但是这里边大家会发现我得区分这几个流的输出结果啊,呃,没关系,这里边大家看我里边可以传一个string,这是一个think输出的一个ientiffi,像这个ID一样,表明我当前输出的是哪条流,哎,所以我可以把这个high放在这儿,对吧?呃,然后low time stream同样打印输出给一个标签叫做low,最后还有这个a stream打印输出,这个叫做O。
10:18
好,那最后我们这个执行的效果大家可以运行一下看一看啊,最后执行的效果当然就是会按照30度温度值作为一个标志,哎,做一个区分对吧?呃,大于它的高温流,小于它的低温流。这个其实实现起来是非常简单的啊。我们先看一下这个运行结果啊,大家看到这里边其实呢,每一条数据它其实都有两个输出,对不对啊,然后这里边它是哪条流输出的呢?大家看这有点像我们之前那个,呃,就是有并行度设置的时候,前面有一个间括号,然后前面有一个字符串对吧?之前我们那个是个数字表示它的,呃,就是并行子任务的那个序号,而现在呢,就是我们定义的它的这个ID,这个think的一个ID,对吧?那所以你看35.8大于30度,那它是输出在了高温流和二里边。
11:11
那如果要15.4的话,那是高二和漏里边对吧,每一条数据都输出两次啊,那主要就是因为这个二和漏这两条,呃,这个R这条流里边,它会包含所有的数据对吧?所有数据都在二里面输出一次,然后要不漏,要不汗哎,都会输出一次,所以每条数据都有两次输出。这就是关于分流的一个操作,那大家如果仔细的话,可能发现了这个split被弃用了,对吧,这里面画了一横线被弃用了,那那这里边他的这个推荐是什么呢?用什么呢。用side output instead啊,这个side output指的是测输出流啊,这个侧输出流又是个什么东西呢?啊,这个可能稍微的麻烦一点,因为它是后面我们要讲到底层API process function里边的一个特性,所以呢,我们讲到后边讲到process function的时候,再给大家介绍这一部分内容啊,那那为什么还要给大家讲这个,主要就是因为这个使用太简单了,对吧?啊,这个使用起来你只要知道它怎么做,直接就可以用了,而且现在呢也还能跑,还能用,所以大家可以先了解一下这个分流是怎么做的。
我来说两句