00:00
接下来我们来介绍另外一种非常常见的基本转换算子,那就是Fla map flat map可以翻译成扁平映射,那它呢,其实本质上是一种比较特殊的map转换,它其实可以看作两步操作的一个结合,也就是说在map之前还要做一个扁平化flat这样的操作。那什么叫做扁平化呢?也就是说我们其实是要把当前的每一条数据先按照某种规则进行一个打散、拆分,这个过程就叫做扁平化,一条数据可能可以拆出很多条数据来,所以呢,往往我们针对flat map进行操作的时候,是把数据流里边的一个整体。一般就是某种集合类型了,拆开扁平化打散,然后再针对打散之后的每一个元素做一个map转换,得到我们想要的数据结构。
01:02
那这样看的话,它跟之前我们所说的map操作最大的一个区别就是之前的map本质上来讲,它应该是一对一的转换,来一个元素应该经过转换之后得到的就是一个唯一的结果。而filter呢,Filter前面我们说,那相当于是可以一对一,也可以一对零啊,就有可能原封不动,也有可能直接就把它滤除掉了,筛选掉了。而对于当前的flat,它出现了更加灵活的情况,因为我们可以把某一个数据打散。当然他就有可能出现一对多的输出情况。所以在实际应用场景当中,Map往往是更加灵活,应用更加丰富的这样一种算子。在代码的具体调用上呢,Flat map也非常的简单,就是基于当前的data stream调用Fla map这样一个方法,然后里边同样传入一个t map方式就可以了啊,那里边具体的实现呢,也可以传入LA表达式,也可以使用真正实现这样一个flat map方式接口这种方式都是可以。
02:15
相应的功能的,那接下来我们就在代码当中具体的测试一下,还是在当前的包下边新建一个叫做transform flat map。Test。那整个的代码流程还是类似,我们先把main方法写出来,抛出异常。那接下来呢,创建环境以及读取数据源跟之前的过程其实是非常接近的,非常类似的,我们就直接copy过来就可以了。好,把这个先copy过来,然后接下来就可以直接去调用。对于three,调用一个flatma方法。里边要传入的当然就是一个方式了,我们这里可以在外边还是自定义一个。
03:07
自己实现的flama方式。实现一个自定义的。Flat map方式。当然了,这里我们也需要public static class把它定义出来,MY。需要实现的接口就是flat map function。我们这里看到了function跟function非常的类似,同样也是有两个,一个T,一个OT,当然就是当前输入的。数据流里边每一个元素的数据类型,而O就是经过转换输出之后,每一个数据元素的数据类型,那所以这里面我们可以有一对多的输出,但是输出之后呢,每一条输出的数据类型应该还是完全一样的啊,那这里最简单的一种方式,我们可以针对当前event输入的每一个事件数据,把它里边的每个字段做一个拆分,拆出来之后每个字段我们都做一。
04:11
一条输出,那我们就把它作为string直接打印出来吧。然后接下来看到Fla function同样也是只有单一抽象方法的一个接口,里边必须要实现的抽象方法就是Fla。Map呢?跟之前的map有所不同,它的输入参数现在不仅仅只是当前的数据元素T类型的value,除此之外还有第二个参数。这个参数比较特别,这是一个叫做collector类型的一个叫做out的参数。Flat map方法不像map方法直接返回转换之后的数据,这里没有返回值。所以我们就发现了map怎么样去输出数据呢?很显然不能直接返回,因为返回的话只能返回一次,只能返回一个数,那我们如果要想要实现一对多这样一个操作的话。
05:09
那就必须在里边能够多次输出数据传递到下游任务里面去,那这里边的传输出方式使用的就是。用到了一个叫做collector收集器的东西。这样一个收集器呢,里边有一个方法叫做collect,它的作用就是直接发出一条数据记录,把它收集起来,那接下来当然就是传递到下游任务了。所以如果说我们当前对这个Fla map想要输出一条当前的转换之后的数据的话,那应该是调用点方法。那当然了,既然我们是通过方法调用来实现数据输出的,显然我们调一次就会输出一次,那如果想一对多的话,那就调用多次out点的方法。
06:03
那甚至我们还可以一次都不掉,那就相当于什么都不输出,这条数据就被过滤掉了,类似于就实现了一个filter的功能啊。所以我们可以看到,对于flat map而言,它可以认为是一个加强版的map。另外它也可以实现filter的功能,Map和filter所有的功能都可以用Fla map来实现。它是最为灵活的一种基本转换操作啊,那接下来我们在这里就可以直接把对应的flat map方法做一个实现,比如说现在我们针对当前value的三个字段属性,要把它们转换成字符串做一个简单的输出,那这里就非常简单了,直接调用点的方法里边可以把当前的value.user做一个输出。接下来还可以继续输出,那这是一条新的数据了,同样也是。String类型,我们可以把URL做一个输出。当然了,如果我们把最后的time也要做一个输出的话,那应该把当前的长整型时间戳做一个string的转换,这样的话就完全没有问题。
07:11
那上面我们也可以直接在这里。做一个print打印输出。下边env XQ执行起来。运行下,我们可以观察到当前的测试结果。我们可以看到这就是最后输出的效果,把每条数据都拆成了三个string字段输出了,所以输出之后的结果看起来就像有九条数据一样,本来我们是三条数据,那每个都有三个字段,拆开打散,再转换。就变成了九条数据,这就是所谓的扁平映射的转换过程。当然了,除了这种。实现自定义的。实现flat map接口这样的一种方式。除了这种方式之外。
08:03
我们当然还可以直接。传入,传入匿名类当然是可以的,实现非常的类似,这里就不再去详细讲述了,另外呢,我们还可以去直接传入。一个拉姆的表达式。那这种方式我们可以来看一看,它使用起来会稍微有一点特别,因为当前的Fla map方式里边的map方法是有两个参数,那我们知道只要它是单一抽象方法的接口都可以使用拉表达式来实现,但是这里边有两个参数,那怎么办呢?我们可以看到它的写法其实跟只有单一参数的时候类似,只不过需要。需要把参数用一个括号括起来啊,那所以当前我们传入的是一个类型的value。另外还应该有一个collector string类型的,我们可以看到collector本身是有一个泛型的,因为这是我们输出的数据类型,我们要把它利用这样一个collect collector收集器收集起来,然后发送到下游任务去,那当然了,当前收集器也应该知道。
09:14
每一个数据元素转换之后的数据类型啊,所以这里面是有泛型的,那后边呢。就是真正我们要做处理的具体的过程,那这里面我们不光可以像之前做的这样把每一个字段拆分开,也可以还可以加入各种各样的筛选条件,比如说我们可以判断if当前的value。他的user。如果equals。Marry的话,那这个时候我们就直接out点,把它当前的。URL输出就可以了,哎,那另外我们还可以L。value.user如果要是equals Bob的话。我们可以输出多条数据,out.collect。
10:02
URL和当前,我们可以把这三条数据全部做一个输出。当然了,既然如果后面我们不再做任何的判断else,不再输出任何的东西的话,那显然不是Mary,不是Bob Alice对应的数据来了,那就什么都不做,相当于就被过滤掉。所以在这里可以看得非常明显,对于Mary的数据,当前做了判断之后,它就是一个一对一的转换,类似于一个map操作。对于Bob的数据呢,我们是把每一个字段提取出来,打散了后输出三条数据,相当于是一个flat map,真正的扁平映射。而对于爱丽丝的数据,来了之后,当前就相当于完全过滤掉,是一个filter操作。这就是flat map,它可以完全涵盖map和filter对应的功能,我们可以直接把当前也做一个输出。为了跟前面的内容做区分,我们可以把上面这个叫做一,下面这个叫做二。
11:04
接下来我们可以测试一下,看一看当前二这个数据里边会出现什么状况,我们会发现这里直接报错了,这里为什么会报错呢?可以看到这里抛出的异常是说当前我们做了这样一个。定义了一个拉姆达表达式之后,它的返回类型没有办法自动的。确定。也就是说,当前的GM。又出现了。泛行擦除的状态,那为什么会出现泛行擦除呢?我们当前本来这个里边的函数不是没有返回类型吗?注意这里边我们用到了collector,用到了收集器去进行结果的输出,所以当前的or很明显它是有泛形的,那对于GM而言,最终泛型擦除之后,他只知道当前我们要去进行数据传递的,收集数据的这个收集器,这是一个。
12:01
但是并不知道我们转换之后的数据类型到底是什么。所以如果说我们想要使用拉姆表拉姆达表达式的话,还应该加上一个。当然就是指明当前输出的数据类型到底是什么,这样的话才可以啊。当然这里我们可以传入对应的Type Class啊,用class的方式传入,那也可以传入type information,另外也可以直接传一个type hi啊,那type hi可能我们直接一个,这个最为简单,因为它相当于可以。给我们把对应的这个类型直接补全,我们直接放在这里就可以了。接下来我们再来运行一下,看看给出对应的类型声明之后。现在就可以正常输出了。可以看到。同样是每一条数据,Mary的数据来了之后,对于一这个转换之后,那当然就是全拆成了三条数据,指令输出,每个字段都输出,而对于二呢的数据,那就只把它的URL输出。
13:03
对于Bob的数据,那是像flat map跟第一条流里面一样直接打散的输出,而对于爱丽的数据,第二条流里没有任何的输出,相当于过滤掉。这就是map的用法。当然了,最终我们不管是map filter还是map,经过转换之后。得到的都是一个single output operator,本质上还是一个data,所以说我们说它都是基本换算。
我来说两句