00:00
了解了map和filter这两个非常简单的转换操作,那接下来呢,我们再来讲一个稍微复杂一点的基本转换算子,那就是其实我们也非常熟悉的flat map,那一般我们把它翻译成扁平映射。其实在前面我们讲到。快速上手做work count的时候就已经用过这样的一个算子了,Fla map,它主要做的操作是什么呢?呃,简单来说,它其实就是可以将一个整体的数据类型啊,比如说这里边我们一个完整的字符串做一个打散的操作啊,比方说我们这里按照空格做一个拆分,然后呢,拆成一个一个的个体。那我们知道得到的这就相当于是一个列表了嘛,啊,那得到这个列表呢,接下来我们还可以把它里边的每一个元素再做一次对应的转换,哎,所以我们就会发现啊,这个flat map,它其实相当于是两步操作一步操作,我们可以把它叫做。
01:03
Fla,也就是所谓的扁平化的这个过程,然后呢。再做一个map转换,所以一般情况我们可以把它看成是flat加map的两步转换结合在一起的一个操作。所以对于flat map而言啊,我们就会发现它显然就会更加的复杂,对于map能做的事情,Flat map其实完全都可以做啊,那另外呢,像filter做的事情,我们想做一个筛选,做一个过滤,Fla map也是可以实现的。所以接下来我们就可以再来测试一下flat map到底是怎么用的。我们同样还是创建一个SC的object,我们把它叫做transform flat map。Test没方法先写在这里,然后呢,开始的这个程序的结构,那一开始的获取执行环境,读取数据源啊,那这两步操作其实还都是完全一样的啊,我们直接把它抄过来,Copy过来就可以了。同样。
02:05
上边我们把这一部分换成下划线。方便后边的影视转换的引入啊,那有了数据源之后,接下来我们就可以对它做一个转换操作了,对于flat map的转换,其实是一个非常灵活的方式啊,也就是说像之前我们提到map转换,它是一一对应,一映射一对一的关系,而filter的转换呢,相当于是一对一,也有可能是一对零啊,就是输出有可能有,有可能没有,而这里的flat map呢,可以认为就是我们当前的输出是可以任意定义的。就是我们可以是一对一,也可以是一对零,也可以没有,那另外呢,还可以是一对多,也就是来了一个输入,我们直接把它打散了之后,一下就输出了多个,呃,我们可以回忆起来之前做word count的时候,输入一行数据,那打散之可能就会统计出好几个词,当前它有多少个啊,那所以我们这个输出就会非常的灵活,所以这里边我们测试的话。
03:11
也可以直接就去测试一下。当前flat map非常灵活的。输出形式。我们这里调用的时候,直接stream后边调一个flat map方法啊,然后我们可以点进去看一下啊,它的实现看起来有三种,我们看一下到底是怎么实现呢啊,最经典的最基本的一种方式当然就是直接实现flat map方式接口啊,所以我们看flink里边它的这种程序架构是非常的有特色的啊,那对于map算子而言,里边实现的就是一个map方式啊,那如果要是filter的话,实现的是filter方式,现在是flat map,当然实现的就是flat map方式了。里边我们点进去看一下,它同样还是一个接口,里边只有一个抽象方法,就叫做flat map,我们看到这个flat map function,它有两个泛型啊,那还是T和O,这和map是完全一样的,因为我们知道它可以做数据的转换处理嘛,哎,所以当然就是有输入有输出。
04:17
然后这里的flat map呢,跟之前我们想到的那个map操作就有所不同了。我们可以回头看一眼map这里做转换的时候。它是直接输入当前T类型的数据,然后得到了O类型的输出。而现在的flat map呢,我们会发现啊,它是没有输出类型的。诶,那我们就会觉得比较奇怪了啊,那这样的话,它的输出到底用什么来输出呢?我们看到它的参数多了一个,除了当前输入的数据啊,是T类型的当前的数据元素之外,另外还有一个。Collector类型的另外一个参数,诶,那这个collector我们自然就又可以联想起之前在做自定义数据源测试的时候,我们会看到。
05:09
Click source,哎,我们自己实现的这个s function里边啊,它的wrong方法里边有一个ctx这个上下文,它有一个collect的方法,就可以发出我们当前的数据,哎,那所以我们自然就想到了,现在好像也是要做一个数据的输出嘛,那是不是也可以用一个类似于之前Ctx.collect这样的一个方法调用来实现数据的输出呢?确实是这样,那这里边我们调用的就是这个collector的collect方法。我们可以点进去看一下啊,这也是一个接口,它里边就有一个方法叫做这就可以相当于向下游去发出当前的一个数据啊,就收集起来发送到下游任务里面去。那这一下我们就清楚了,对于当前的flat map而言,我们如果想要输出一条数据的话,转换之后输出的时候,只要调用当前这个collect的collect方法,就可以得到一个O类型的输出了。所以那既然当前我们这个方法本身是没有返回值的,每调一次就会得到一个输出,那当然就是我们掉多少次都可以吧,如果不掉,那当前就相当于可以实现一个filter的功能,没有输出过滤掉了,那如果说掉只掉一次的话,相当于就是一个一一对应的map转换。
06:37
那如果说我们调多次它的collect的方法的话,很显然这就变成了一个一对多的扁平映射,拆散了之后,打散了之后再做转换的这样的一个操作。所以接下来我们也可以在具体的代码当中去做一个测试实验。我们在这里就直接来自定义实现。
07:01
Flat map方式接口。所以这里边又是我们这里边定义就叫my flat map吧。然后接下来extend一个flat map function。后边有泛型一个T一个O,哎,我们当前是event,这是输入的数据类型,那输出的数据类型呢,比方说我们还是直接提取当前的user字段,哎,那这样的话,我们可以直接给一个string作为输出,那里边必须要实现的是一个flaman图像方法。那flat map这里具体的逻辑呢,就可以按照我们希望的这种方式啊,随便去指定了,比如说我们这里就去判断。如果。当前数据是。Mary的。点击事件。那么就直接输出user。啊,那所以这里面我们可以直接做一个判断if。
08:04
里边的具体逻辑其实还是啊,就是当前的T,它的user如果要是。Marry的话,那么我们接下来啊,那就直接使用这里的,我们看到这个就叫做collector名称就叫做collector,那么直接调用它的collect方法。输出什么呢?哎,那当然我们输出的应该是当前的t.user直接把它做一个输出。那另外我们还可以继续判断l if可以指定另外一个规则。如果。当前数据是Bob的点击事件。哎,那么比方说现在我们就不是简简单单的把它转换成user输出了,我们要把它的user和URL,因为我们知道两个都是string类型嘛,所以完全可以输出两次输出两个数据,所以。这个时候我们就。输出user和URL啊,那这个时候else if。
09:06
t.user如果要是。Bob的话。我们当前的输出,那就应该是collect.collect t.user做一次输出,另外还要collector.t.URL。诶,这就是我们在flat map里边啊,想要输出几次就输出几次啊,那当然这里只判断了Mary和Bo,那假如说这里边我们还有一个用户是Alice丝啊,那对应的我们也可以添加这样一条数据。这是Alice的一个点击数据。第三秒钟。做了一个点击啊,那么我们就会发现啊,相当于我们就把爱丽丝的数据过滤掉了,他不做任何的操作,不做输出,那就是一个过滤,所以在上面的话,我们就只要去拗一个。
10:00
MY。My flat map对应的实对象实例,然后做一个打印输出就可以了啊,这就是我们处理一个流程,最后不要忘记envlecu,执行起来好,那接下来我们可以运行一下,看一看输出的结果到底是什么样子。我们可以看到完全符合预期,首先第一条数据来了之后,诶,那么判断它是Mary,我们就只输出了一个Mary,第二条数据Bob来了之后呢,诶,那就会输出两条,一个是Bob,另外一个是当前他点击的URl.cut啊,那另外第三条数据Alice丝数据来了之后呢,没有任何的输出,相当于把它过滤掉了。啊,其实前面我们在看到这个flat map,它里边传参的这种方式的时候啊,除了实现一个flat map方式,另外我们发现它其实也可以去传入一个拉姆达表达式啊,那当然了,对于这种多参数的拉姆达表达式,我们写起来可能就会比较费劲一点啊,所以如果说我们喜欢直接去实现flaman方式的话,直接这么写是完全没有问题的,那如果说我们习惯去写拉表达式,也可以做一个改写。
11:16
这个我们就不再去更多的介绍了啊,整体的逻辑跟这里是完全一样的,这就是flat map的用法。
我来说两句