00:00
读取数据源之后,那接下来我们就可以对数据进行一些转换计算,然后得到我们想要的计算结果了。所以在实际项目当中,真正的业务逻辑其实是集中在我们读取数据源之后的转换操作,所以后边的这一其实是我们整个data stream API里边最为丰富也最为核心的部分。接下来我们要讲的就是转换算。啊,那么整体来讲,在data stream API里边,它有各种各样的操作,我们可以把一个或者多个data stream转换成新的data stream来进行处理,那直观上来看的话,其实就是把里边的每一条数据都进行了相应的操作,那么当然我们原来的流数据流也就变成了一个新的数据流。那呃,我们可以针对一条流进行转换,也可以进行分流和河流这样的一些多流转换操作。
01:05
在本章当中,我们当前要讲的主要是针对一条流的处理。而多流转换操作,我们会放在后边的第八章进行讲解。那接下来我们首先这一部分要讲的就是一些最基本的转换算子,其实如果大家对大数据本身比较熟悉的话,那其实会发现很多大数据框架或者工具里边都有相对应的一些算子,或者说对应的一些API方法调用。比如说这里最简单的就是。翻译过来的话应该叫做映射啊,这是我们非常熟悉的一个大数据操作算子了,它主要就是把数据流里边的数据进行一个转换计算。简单来讲就是一一对应,每一个数据来了之,经过处理之后都得到一个对应的数据。比如我们这里的一个定义了一个map计算,把每一个正方形的数据加工转换变成一个圆形的数据进行输出,这就是所谓的一一映射map操作。
02:16
代码当中我们其实非常简单啊,只需要基于一个data直接调它的map方法就可以,那这个map方法在代码里边呢,必须要传入一个map方式接口的实现类啊,所以整体来讲跟前面我们讲到的算子。非常的类似,之前我们讲到的是可以回忆一下,之前我们讲到的是去ADDS,然后里边传的是一个方式。而现在呢,我们就应该是基于这样的一个data stream去调它的点map方法,传入一个map方式。啊,那接下来我们就在代码当中具体来做一个实现,同样还是在当前第五章里边去new一个Java。
03:06
当前我们要测试的是。做一个map的测试,Map test。把没方法先写出来,呃,那么还是不要忘记后边应该去。抛出异常。首先我们还是创建一下执行环境execution environment。同样还是叫做E,不失正确性,直接把全局的并行度成一,方便后边我们进行串行的测试。然后接下来我们就要去读取数据了啊,那这个时候我们做一个简单的转换的话,呃,那还是用之前我们定义好的这个event类型的数据吧,在这个数据里边有三个字段,用户的名称,有他击的URL的那个字符串,最后还有一个时间戳,点击事件的时间戳,呃,这里我们做一个最为简单的转换,相当于就是一个ETL,我们只要提取当前。
04:12
每一个用户的名称提取出来就可以了啊,所以这就相当于只是截取它里边的U,那么这个操作怎么样去做呢?呃,在代码当中。为了要做这样的操作,首先我们应该把对应的数据读进来。这个数据。可以直接在代码里面写死。之前我们有很多。调用基本的API去读取数据的方式,这里既然是做测试嘛,我们直接从元素去读取数据可以了。好,把数据先读取进来。这里是从。元素中读取数据。读进来之后要做的操作,当然就是进行。转换计算。
05:03
提取user字段。这里我们可以用一个最为简单的方式来做一下啊,这里不需要叫三了,直接叫stream就可以。直接调一个map方法,我们可以看到当前是在stream里面定义的一个方法,它要求我们入一个map方式。这个方式本身是一个interface,是一个接口,那么它里边必须要实现的是一个叫做map的抽象方法,那这个map其实就非常的简单,我们可以看到这个map方式有两个泛型,一个T,一个O。所以这里的T其实就是当前本身要处理的数据的类型,而O呢?O是output,自然就是转换之后输出的数据类型。那这里的方法就看的非常的明确了,就是输入一个T类型的参数,然后返回值是一个O类型。
06:06
具体在我们代码当中。我们可以在下边单独的去定义一个对应的那方式。自定义map方式。当然,这里是自定义一个类去实现慢风式接口了。我们直接在当前文件里面写的话,把它定义成一个静态的类static class。可以把它叫做my map。Implement map function。注意,这里有泛型,我们是要把一个一个事件转换成提取它里边的user,那当然是转换成一个string了。这就是我们要实现的东西。接下来我们需要实现里边的map方法,这个map方法直接返回的就是我们想要提取出来的user字段,所以直接点user就可以了,这就是我们最简单的处理逻辑。
07:14
所以stream map传入的当然就是你一个my map。这样的话,我们就。当前的进行了转换计算。所以当前可以把它叫做一个result。接下来把result进行。Print打印输出,最后不要忘记还有exe执行起来,接下来我们可以测试一下,看看得到结果是不是我们想要的。每一个用户点击数据里面的用户名称。我们看到Mary Bob、爱丽丝按照顺序依次输出了,完全没有问题啊,这就是使用。Map。
08:00
转换算子进行,转换数据进行。对数据进行转换处理的过程。那当然了,在flink里边我们会发现,如果用这种方式啊,单独定义一个类实现接口的方式的话,可能会稍微的有一点麻烦啊,所以这其实是最为基础也是最为直接的实现。使用。自定义类。实现。Map方式接口。那么当然我们就会想到,如果不想这么麻烦的在外边去单独定义这样一个类的话,其实我们也可以直接把它写成匿名类的形式,因为在Java代码里边,我们想要实现一个接口的话,如果说不需要把它单独命名,单独的提取出来,抽象出来的话,只是在这里一次性的使用,那其实直接在里边去一个。
09:05
后边直接扭一个map方式,然后把对应的这个接口给出一个实现就可以了啊,所以接下来我们也可以用这种方式做一个。做一个实践。那就是。使用。匿名类。实现map方式。接口。那这里同样就是stream可以直接去map,里边要传入的是一个new,一个map方式啊,那同样这里的泛型输入是输出,默认是object,我们这里要输出是一个string字符串,里边同样还是要实现一个map方法,那它的逻辑都是一样的value.user返回就可以了。这里得到的结果我们可以叫做二啊,那前面为了看得更清楚,我们把前面叫做一,下面可以把RESULT2也做一个打印输出,看一看得到结果是否相同。
10:10
我们可以看到完全一样,Mary Bob Alice得到了相同的结果。啊,那其实如果要是直接这么来看的话。匿名类的实现和自定义一个my map这样一个类,然后传入它的一个对象,这两种方法比较起来呢,其实是差不多的,只不过就是第二种方法,匿名类的实现不需要再单独的定义这个类了,而是直接把它实现在map里边,那里边的核心逻辑其实就是这个方法里面要返回。那有没有更加简单的方式呢?其实是有的,在Java里边我们知道JAVA8之后引入了所谓的匿名函数拉姆达表达式,而对于里边内部只有一个方法的接口而言,它本身是可以用拉姆达表达式来替代的啊,所以这里面其实有一个非常简单的处理方式。
11:11
那就是第三种。传入。表达式。也就是说,直接传入一个匿名函数。String map。比方说呃,拉姆达表达式的话,在Java吧里边是一个这样的向向右的箭头,我们这里边前面的data其实就是当前的。要入的匿名函数的参数啊,那我们要返回的当然就是。这样的话就会非常的简单。把它叫做STREAM3。RESULT3。然后接下来我们同样可以打印输出,看一看得到结果是否相同。
12:02
确实完全一样,Bob爱丽啊,所以这三种方法其实是完全等效的。呃,当然了,如果对比之下我们会发现使用第三种方法直接传一个lada表达式其实是最为方便,所以在之前我们讲到代码的时候,其实也是用到了类似的这种形式,我们可以看到在这里Fla map直接写了一个表达式去做这样的一个处理,那这里面比较麻烦的一点呢,是因为有时候考虑到。泛型擦除的问题,所以直接写拉姆达表达式的话,没有办法让link帮我们自动推断出里边的类型,那这种时候就必须后边还要跟上一个returns,而如果说我们在这里是直接实现了对应的类的话,那就不存在这样一个问题了。所以这两种方式我们可以自行去做选择,可以传拉表达式,也可以使用自定义类。
13:02
关于这一部分内容,我们会在后边讲到link里边的UD这样的一个实现风格的时候,还会详细再做说明,这就是关于map的使用。
我来说两句