00:00
读取了数据源之后,接下来我们当然就是得到了data stream就可以调用各种方法对它进行转换处理了,其实我们知道啊,对于一个流处理程序而言,最核心的部分当然就是中间的这个转换操作transport啊。所以。我们所说的data stream API,其实绝大多数都是中间进行转换计算的一些步骤。他们就决定了我们整个代码的业务逻辑啊,对于转换计算操作而言,整体来划分的话,其实是可以划分成两大类的啊,那一种呢,就是所谓的。单流转换,也就是说我们当前读入一条数据流,经过各种转换之后呢,它一直保持着一条流的状态,哎,这就是我们所说的这个简单转换,或者叫单流转换,那另外呢,也可以出现多条流之间进行转换的状态,比如说可以出现一条流经过处理转换之后分成了两条流,这是一个分流操作,那另外呢,也可以多条流经过处理转换之后汇成了一条流,这是一个河流操作,分流和河流我们统称把它叫做多流转换操作啊,那这一部分呢,我们会放在后边。
01:23
第八章再进行详细的讲解,所以在当前这一章里边,第五章第三节我们所讲的转换算子呢,主要是针对一条流单流进行转换的一些操作,而且是比较基本的一些操作啊,那这节里边我们所讲的转换算子主要又可以分成基本转换算子。和聚合算子,以及后面我们要介绍的物理分区算子,首先我们先来了解一下基本转换算子,基本转换算子顾名思义就是非常简单,非常基本的一些转换操作啊,那对于大数据的学习来说啊,这些其实都非常的熟悉了啊,首先最经典的当然就是映射转换。
02:10
我们所谓的map啊。Map简单来讲,哎,那就是我们所说的做一个一一映射来一个元素,就做一个特定的操作,得到对应的一个输出来一个就得到一个新的,所以整体来说的话,我们放在这张图里边就会看到啊,我们当前的这个map转换。相当于就是对数据形状的一个转换,把方块要转换成圆形,所以呢,每来一个数据,我们对应的就有一个圆形的输出啊,那在调用的时候其实也非常简单,就是对于一个data stream直接调map方法。然后里边传入一个我们定义好的转换操作,接下来就可以把当前的数据流data stream转换成一个新的数据流data stream。啊,那接下来呢,我们可以在代码当中去做一个简单的实现,我们还是在当前的包下边去新建一个测试的object,当前是。
03:12
我们要测试的是map test。没方法里边啊,那首先还是我们需要把。当前的执行环境创建出来。Get一下啊,上面我们记得把这个。下划线,引入当前的一些影视转换,还是把它叫做EV。同样,为了方便测试,我们把全局的并行度先设置成一。接下来呢,我们首先需要去读取数据啊,那这个数据我们简单做一个测试的话,就直接用之前已经实现过的代码吧,比方说我们就直接从元素里边去读取数据啊,我们定义两个I Mary的一条点击数据和Bob的一条点击数据,直接把它拿过来。啊,那这个我们也不需要叫做STEM1了,直接就叫做stream,然后接下来就可以对它进行转换处理了,比如说我们现在的需求是。
04:09
对当前输入的每一条数据,当前的一个点击事件做一个简单转换,提取出每一条数据里边点击的用户到底是谁,也就是说我们这里边每看到一个event,就把它直接转换成一个string类型的用户就可以了啊,那对于这样一个转换操作啊,我们其实就是要提取。用户名儿。每次点击事件。的用户名啊,那所以对于这样的一个操作呢,我们直接STEM可以调用一个map方法做一个简单的映射,那这个map方法里边我们会看到啊,它要求我们在这里边要传入的是一个拉姆的表达式,是一个函数啊,当然了它还有另外一种方式,我们看到下面这种方式呢,它是传入一个map方式。
05:07
这跟前面我们讲到定义数据源的时候传入一个s function有点类似啊,很显然这个map function也是一个接口啊,那当然了,我们发现它也可以用这个一个函数来实现的话,这种方式可能我们更加的熟悉一点,所以这里边我们用两种方式分别做一个实现。第一种方式,那就是。使用。匿名函数,也就是拉姆达表达式,那对于盖拉而言,其实这个拉姆达表达式非常简单啊,我们当前的这个参数行参只出现一次的话,可以直接用一个下划线来代替啊,那么我们直接提取当前的user做一个处理就可以了,转换成user。那当然了,后面如果说我们不做其他的处理的话,可以直接做一个打印,比方说这是我们的第一种情况,打印输出。
06:01
那第二种情况呢?同样还是提取当前的user,我们可以实现。Map。方式接口。那当然了,要实现那function接口的话,我们应该在外边。单独的定义这样一个类去实现那方式接口啊,比方说这里边我们直接定义class,我们叫user。就是这样的一个类。重点就是要提取当前的用户名这个字段,Extend map function。我们看到my function这里它是有泛型的,泛型呢,To是两个,哎,那之前我们会发现啊,This stream本身我们这里边得到this stream的时候,它本身有一个泛型就叫做T,那现在呢,慢function有两个泛型,一个T,一个O,这代表什么呢?首先T就表示当前要处理的这条数据流里边数据元素的类型,那O呢?很显然O是output的一个简写啊,那所以O就指的是我们经过当前的map转换处理之后所得到的新的数据流里边的元素的数据类型。
07:17
啊,所以我们会发现啊,这个map转换。经过这个map转换之后。数据流里边元素的数据类型是可以发生变化的,就像我们前面举的例子啊,本来是方块,那接下来可以变成一个圆圈。里边的数据类型发生了变化,那对应在我们外边就是一个泛型啊,所以慢方式有两个泛型参数,所以接下来我们在这儿。本身是event,就是输入的数据类型,那对应的还有一个输出的数据类型,我们现在就是一个user,那就是string。那接下来里边必须要实现的一个方法,我们看到就叫做map,这个map呢,里边有一个参数是一个event事件。
08:03
所以我们会发现啊,最后得到的返回类型是一个string,所以我们直接就应该是输入的每一个数据,就是这里的event,调用这里的map方法作为参数传进来,然后呢啊,我们经过某些转换操作之后,得到一个string类型的返回值,就是我们转换之后输出的数据。所以接下来在这里其实也非常简单,那就直接。返回t.user就可以了啊,所以这个过程其实跟上面啊,我们直接用这个下划线形式实现一个匿名函数是非常类似的。那在这里我们调用的时候呢,当然里边就需要去new一个当前user instructor的实例啊,那后面我们可以做一个print打印,这是第二种情况。最后如果要执行的话,我们必须还要调用Env.exe。这就是我们完整的一个测试map的过程,接下来我们可以运行一下,看看效果怎么样,能不能提取出对应的user。
09:09
现在代码已经运行起来了,我们可以看到完全没有问题。第一种情况,我们使用匿名函数的时候,哎,那么开始这一条Mary的点击事件输出的就是Mary,那后面Bob的点击事件输出的就是Bo,那同样后边使用这个map方式实现map方接口的这种方法的时候,输出也是完全一样的啊,这就是map转换操作。
我来说两句