00:00
到了UN宁这样的一个河流操作,那我们总结一下的话,UN宁这个联合看起来就是非常的简单,它主要简单就简单在两个流里边,合并的两个或者多个流里边,它们的类型必须完全一样。数据类型一样的话,哎,那么这合并就没有任何的问题,就直接放在一起,它就是一个流嘛,所以这里面主要涉及到的其实就是数据的重新分配,重新传递的这个过程,然后还有这个水位线的对齐的这个过程,然后我们这里边得到的data stream,它里边的类型当然也还是不变的,那接下来你就把它当成一个流,该map map,该filter filter,该开窗开窗做后续的处理就可以了啊,所以这个整体来讲就是非常简单,特点就是类型都相同,可以有多个流联合。那接下来自然我们就想到了,那假如说类型不一样的两条流可以做联合吗?啊,因为我们如果要是对比之前大家熟悉的批处理的这个场景啊,我们往往是要对表进行操作的,两张表如果要去做做连接啊,其实我们就是做一个draw影,如果要是做一个join影的话,那肯定是表里边的结构是不尽相同的,一般我们就是指定一个K,然后通过这个K把它们连接在一起,就join引起来了。
01:17
哎,那我们现在如果要是两条流的话,肯定会有类似的处理的需求,那所以这里面大家就看到了,那你既然两条流的类型可以不一样,那你用union就没用了。哎,那怎么办呢?这里给大家介绍另外一个更加重要也是更加通用的连接两条流的方法,叫做connect。啊,这个connect其实是应该说啊,在data three API里边应用最广泛的双流连接,或者说这个双流双流合并的这样的一个操作啊,那它其实主要是因为它比较底层。就是理论上来讲,你用它什么都能做,那我们简单来看一下这个连接到底是怎么干的啊。
02:00
首先connect,这个操作就是一条流,一个data stream,然后点connect。然后连接另外一条流啊,这就是这样的一个操作,那么这两流这两个data stream连接之后得到的是什么呢?大家要注意啊,得到的是一个。Connected streams,那为什么他不直接得到一个data stream呢?因为我们知道它之前我们union能直接得到data stream,是因为它的类型一样啊,Data stream里边是要有一个泛型的,那你说现在我们两个流类型都不同,那你合并起来的那个data stream,它的类型是啥呢?啊,那有可能说那这个简单啊,你都是object不就完了吗?诶,但大家知道你如果统一用object的话,它里面的类型信息就丢失了,我们就根本就没有办法再去详细做解析了,这不是我们想要的,我们不是直接向上这个取他的公共数类就完了,所以我们这里边其实是想让他们各自有各自按照不同的类型啊,不同的流,里边的数据是要有对应的自己的操作方式,然后呢,再把这些数据还能连在一起做合并。
03:05
哎,这才是我们真正连接两条不同类型流的目的啊,所以大家会发现这个连接流,它主要就是给我们提供了这样一个转换的接口。就是你一开始不要直接就得到一个data stream object,哎,我们先得到一个连接流,这个连接流里边呢。它只是形式上把这两条流统一在一起,大家看这个图。哎,它就是相当于我们这个连接流啊,里边还是各自为正的,你该是什么类型还是什么类型,然后什么时候才真正的合成一个流呢?诶,那就是我针对你内部的这两个流里边的不同类型都要做一个转换,最后总要把它转换成同样的类型啊,比如说我们做这个map或者Fla map或者process,大家知道可以改变它的数据类型吧,那这样的话最终得到了一个data。所以大家会看到双流如果要做connect连接的话,它的操作步骤是分两步的,这一个算子是分两步的,第一步connect,得到一个所谓的连接流connected streams,然后做这个对应的map flat map process操作,处理这一条流里边的呃,对应的两条流不同的类型的数据,最后得到一个完整的data。
04:21
啊,这又是两步操作得到德塔。总结一下的话,大家看到这就跟我们所说的这个一国两制有点像,对吧?啊,就是我们这里的connected streams,把它先联合在一起,但是呢,呃,就是我们形式上统一了,但其实呢,里边是两种制度,我们是分别去做这个处理的,最终我们的目的都是一样,我们是呃,同样的一个国家,对吧?呃,最后我们都是都是平等的一个状态,但是中间这个处理的过程会有所不同,因为你本来的类型不一样嘛。啊,所以这这就是大家可以看到一个具体的一个应用啊呃,那在代码里边调用呢,主要来讲就是先调一个connect,刚才我们说了对吧,先调一个connect方法,然后再调map flat map或者process这样的方法去做一个处理。
05:12
那这里要给大家说的是,这里边如果调map方法里边传个什么东西呢?那map方法显然里边传的是map function吧?Fla map里边传一个map function process里边传一个process function。这是我们最先能想到的,但显然不是这样,因为之前map function它只能处理一个流里面的数据啊,只能获取当前当前数据对不对?那你数据两条流的数据类型又不一样,你怎么能用一个慢function就直接把它搞定呢?啊,所以这里给大家说一下啊,Map这里的map方法flat map,还有process方法里边传的。跟之前的又有所不同,Map方法里边传的是一个Co map function。Flat map里边传的是一个flat map function。那么process里面对应传的就是。
06:01
Co process function,大家回忆一下,这就是我们之前讲到的处理函数分类里的这种类型对不对?Co process function就是在这儿用到的,它是要处理两条流,一国两制,所以多了一个抠啊啊,那接下来我们看一下具体的这个应用啊。我们可以大概的先看一看这个代码是长什么样子啊,首先我们需要两个流,大家看到现在的两个流类型不一样,一个是integer,一个是law啊,一个整形,一个长整型,然后123 123啊,接下来我们把这两条流量连接在一起,大家看连接的话就是一个connect啊,这个这个就直接连接在一起了,连接在一起之后,我们这里边掉了一个map,然后里边有的是一个Co map function,然后大家看这个Co map function它有个什么特点呢?啊,之前我们map方式里边不是有一个map方法吗?呃,我们说它是一个单一抽象方法接口,那现在呢,现在不是唯一了,现在两个它就叫MAP1MAP2。
07:06
所以这个一国两制看的是不是特别的清晰啊,你MAP1当然就是大家看integer类型的输入,就是第一条流里边的数据来了之后走这个去处理,那MAP2呢,当然就是第二条流长整形的数来了之后走这个去处理。啊,那大家可能会发现,诶,那你输出的是什么类型呢?输出的都是string,这个类型是在我们Co map function这里,它的泛型就直接已经指定了。为了给大家看清楚,我们还是简单的写一下吧。我们把这个叫connect。我就快速的直接把union来copy一下。哦,当然这里面我们就不要再去整那个event了,而且还要用click source,很麻烦啊。这边我们可以就借鉴。当前文档里边列出来的这种定义方式from element。这里面STREAM1。
08:03
然后同样的啊,来一个stream,这里来一个长整型。里边啊,大家不想123的话,那我们来个456也一样啊,但是你得加上L。4567就可以多一点对吧?哎,那这样的话里有两条流,接下来他们可以做对应的连接操作,那大会发现这个STREAM1怎么连接呢?哎,直接去connect。另外一条流啊,这里边我们传入的是stream off,然后接下来大家看这个connect调用之后到的就是一个connected stream,然后这个connected stream呢,Data stream已经没关系了,它有两个泛型啊,当然就是一国两制嘛,当然就是第一条连接的第一条流的这个类型,第二条流的类型,那到底哪个是第一条流,哪个第二条流呢?哎,当然是谁调谁就是第一条流,对吧?跟我们这里的这个一和二的定义没关系啊,你如果我这里如果用二去CONNECT1的话,那这里的第一条流就变成了DREAM2啊,变成长整形了,对吧?哎,那我刚好就给大家反过来来试一下啊,反过来看一下STREAM2去connect stream1,那么这里的INPUT1类型就变成了long长整型,然后它里边有哪些方法呢?哎,大家看到Fla map。
09:21
里面传的是一个Co Fla map function,好,那这个我们就不看了啊,都一样,但它也可以K,大家看到啊,还有map map,这里就Co map function,另外还有process process,就是Co process function,主要就这三种啊,另外还有K,对吧?啊,主要就是这样的啊,然后这里大家可以看一下这个map。盘的这个Co map function啊,啊,是一个接口里边三个类型啊,这就我们说一国两制嘛,那你最后总要统一啊,统一在一起对不对,所以转换之后的类型必须一样。必须变成我们真正的转换之后,是一个统一的data stream嘛,它的类型是什么就是什么。
10:01
那这里我们就是第一条流的类型,第二条流的类型,还有一个out,里边是两个map方法。啊,这具体的map的应用跟我们之前那个map function里边是一样的,对吧?啊,那除了这个。除了扣map function之外,我们还可以看一下扣flat map方不是。大家会看到里边也是一样,也是INPUT1INPUT2,还有al,那另外还有里边要实现的就是Fla map1fla map2,那具体使用也跟之前一样,对吧?啊当前的数据,另外输出是靠这个collect al点输出,没有返回值类型。啊,那除了这些之外,那还有一个process,就是什么呢?统一都给大家说了,Process里边传的是一个Co process function,大家还记得process function里边必须要实现的是一个。Process element方法啊,那我们看一下这个Co process方式呢,这里也是啊,Input in1in2out啊,那这里边我们看到它必须要实现的抽象方法,一个process element1,一个process element2啊,就是这样去用。
11:08
啊,所以这就是,呃,那当然它也有这个on timer对吧,那它也可以去设置定时器,设置定时器的前提是你必须K啊,我们这里边就是没有K的话,那当然是不能用的啊,然后它就多了一个上下文嘛,啊,就跟我们之前那个process function用法是完全一样的。这就是后要做的操作,那这里我们可以简单的给大家稍微的看一眼。这个是。呃,这里啊,我们先看一眼这个map,这里我就可以直接new一个home map方式,大家可以很明显的看到现在的一是不是我们的STREAM2啊啊对吧,谁连接谁是一啊。啊,那这里边就是MAP1MAP2啊,大家看到如果我不直接指定类型的话,它默认是object,所以这个肯定是有问题的,那我们这里边比方说我把它定义成一个string。
12:03
那后边。这里默认是不是就必须返回一个string了啊,所以这个其实还是整体来讲还是非常简单的啊,那我直接把它to string,把它to string都是没有问题的啊,那那当然。想想写上一个前缀的话,我们也可以加上啊啊,这个是音体值。就是我们测试的一个具体的过程啊,那最后我们可以把它做一个打印输出。背后全部都是这样的一个一个string了,对吧,两条流就直接合在一起了,我们可以运行一下看看。啊,大家可以看到这就是两条流对吧?啊,大家看4152637啊,就两条流就变成了一条流,然后全都是string类型的一条流。这就是connect的基本用法。
我来说两句