00:00
我们已经了解了UNUN可以说是flink当中最简单的一种河流操作,那在代码里边也非常的简单直白,那就是直接基于一个data stream,调一个点UN方法传入另外一个data stream就可以了啊,那我们这是合并两条流,如果说想合并多条流的话,那其实我们知道得到的结果它还是一个get stream啊,哎,那接下来继续在后边点un.UN不停的UN都是可以的,那或者我们也看到了,本身UNI方法里边传入的参数其实是一个可变参数,我们完全可以传入多个不同的data stream,把它们全部捏合在一起啊。所以幽影的好处就在于调用简单,而且不仅仅是可以做两条流的合并,多条流的合并都是可以任意实现的。当然啊,它有一个非常明显的限制,就是当前所有合并的流做联合的必须里边的数据类型是完全相同的。
01:01
那我们自然想到了,那如果说我们想要把两条数据类型不同的流联合起来去做操作,那又应该怎么办呢?其实我们知道在实际应用场景里边,绝大多数情况可能诶,我们的数据来源啊,数据的形式应该都是不同的,只是里边可能有某些字段有联系,我们想把它连接在一起,那这种情况又应该怎么去做操作呢?啊弗利给我们提供了另外一种更加通用,更加灵活的河流方式,那就是所谓的连接操作connect。顾名思义,Connect就是要像连线一样把两条流联系在一起啊,那对于connect操作呢,因为它允许两条流的数据类型不同,那我们就想到了,那你合并之后的这一个数据流,他也应该是个data stream呀,这样一个data stream,它的数据类型又应该是什么呢?啊,那很显然我们不能直接定死,最后合并出来的数据流,它的类型到底是什么样的啊,那所以呢,这里边用作的操作是我们进行连接之后,首先得到的是一个连接流啊,它得到的并不是data stream,而是一个connected streams。
02:17
所以这里边的这个连接流呢,我们可以认为它形式上是两条流连在一起,看起来好像像一条流了,但事实上呢,并不是,事实上里边两条流还是各自为政的,彼此之间还是相互独立的,如果我们想要真正意义上得到一个转换之后的新的data stream的话,那还应该基于这个connected streams再去做一个转换操作。比如说可以在后边直接调用一个map方法,哎,那就相当于可以同时指定我们合并的两条流,STREAM1怎么样去做转换,STREAM2怎么样去做转换啊,那对应的啊,也可以去调用Fla map,还可以调用process,所以我们看到这样的两条流去进行合并之后的处理,就有点像是一国两制一样啊,就是我们这里啊,本身是已经合并到了一条流里边,我们形式上已经统一了,但是呢,具体去进行操作的时候呢,可以有自己的规则啊,那这就是我们说的啊,接下来进行转换的时候呢,各自可以定义各自的转换完成之后,那当然得到的结果数据类型就再次统一了。
03:34
再次得到了一个string。所以从代码调用上来看的话,那就应该是一个data stream去调用一个connect的方法,然后哎,当然另外一个data stream呢,可以作为参数传入,得到的结果是一个connected streams,然后这个流呢,又可以调用对应的转换方法,得到一个新的data啊,那这个我们可以直接类比之前做过的KBY以及开窗window这样的一些操作,我们会发现啊,这其实就是flink当中API的一个风格,我们是以data stream作为核心和基准,然后呢,基于它可以做各种各样的转换,得到一些新的数据流的类型,比如说k stream,或者window stream,或者这里的collected stream,然后接下来呢,又可以经过一系列的转换操作,最终再回到data stream来,哎,这就是我们所说的。
04:36
Data stream API的主要特点啊,那接下来我们就在代码里面做一个具体的测试啊,那这一部分呢,我们主要是测试。Connect相关的内容,所以是connect test。同样还是没方法,先写下来,首先创建执行环境,我们把它get到。还是叫做烟V上边引入对应的影视转换。
05:03
DV全局的并行度先设置成一,然后接下来我们就随便定义两条流来进行一个合并连接的测试吧。啊,那我们这里边就。定义两条整数流,里边的数据元素就都是整形的啊,非常的简单。首先我们定义一个STREAM1ENV,直接from elements。123,诶,这就全是int类型的数据,这是一条int类型的data stream,然后呢,呃,另外一个,因为我们的数据类型可以不同,那我们干脆把STREAM2定义成一个长整型的数据流吧,From element 1L2L3L啊,那这个时候我们就知道了,Stream是data stream law这样的一个数据类型。所以接下来我们就可以对它们进行连接操作。连接两条流。
06:00
STREAM1直接调用connect方法,那connect我们这里可以点进去看一下里边传的参数,可以直接就传入另外一种数据类型的data stream,这个是完全没有问题的。当然了,Connect还有另外一种调用方式,传的不是data stream,而是一个broadcast stream啊,这个是后面我们会提到的广播流啊,这个我们讲到广播这一部分的时候再去做介绍。最常用的连接方式当然就是一条流连接另外一条流了,所以接下来里边当然就要传入STREAM2。然后后边要执行一个操作,我们看到connect方法返回的是一个connected streams,所以在这一个类里边,我们看到它跟data string并没有直接的继承关系,所以呢,它里面的方法是完全不同的,不过呢,它的转换计算的处理方法名称基本上跟that stream都是一样,比如我们看到这里有map啊,那当然了,后面还会有process,还会有flat map等等啊,那这里主要它能调用的方法就是这些,还可以去做K,那这里我们可以看到以map为例,它里边具体的实现呢,其实就是要传入。
07:21
两个函数啊,因为我们知道本身啊,Data stream如果调一个map的话,里边只要传入一个匿名函数,或者传入一个函数类的具体实现,那我们就可以知道该怎么做转换了,但现在不行啊,现在要一国两制,所以就必须传入两个对应的函数啊,那我们可以传入匿名函数,也可以传入一个Co map function,那我们看到这个Co map function又是什么东西呢?Com map function啊,其实就是一个接口,只不过这个接口不再是单一抽象方法,而是里边有两个map方法,一个叫做MAP1,主要操作我们第一条流里边的数据进行转换,还有一个叫MAP2,主要操作第二条流里的数据进行转换,诶,这就是所谓的Co map操作。
08:12
那当然了,呃,如果说我们这里调用的是flat map的话,很显然里边要传入的就是一个扣flat map方式啊,那这里边呢,其实就是flat map1和flat map2。或者我们这里如果要直接调用一个process方法的话,里边要传入的就变成了一个Co process方式,那当然了,里边对应实现的就是process element1和process element2啊,这就是我们所谓的这个两条流合并之后去进行转换处理所要调用的方法啊,实现的函数类就是这个样子,所以接下来呢,我们就直接去实现一个最简单的map吧,啊,那里边需要去实现的就是一个Co map方式。当然了,这里边需要有泛型,它的泛型其实就是第一条流的输入,第二条流的输入以及输出,比我们一般的map方式当然就多了一个输入的数据类型,所以这里边我们的两条流的类型,一个是int,第二个是long,最后输出的数据我们干脆就一个string输出一条信息吧。
09:19
里边必须要实现的一个MAP1,还有一个MAP2,所以可以看的是非常明显啊,那既然我们直接输出这个string类型,干脆就。包装一下啊,我们直接就是带上当前的类型,然后显示一下到底是什么数据就可以了,Dollar value。那同样道理,下边我们输出的。第二条流里边当然都是长整形的Dollar value,这就是我们想要去做的对应的处理。然后接下来得到的STEM类型的数据流就可以直接打印输出了,EV electcu执行起来,这就是我们一个非常简单的两条流进行连接的测试,好,我们运行一下看看效果。
10:09
我们可以看到啊,输出非常的明显,那就是把两条流里边所有的整形以及长整形的数字全部合到了当前string类型的流里面,那所有的数据全部输出了。这就是关于connect的基本用法。
我来说两句