00:00
来我们继续看第二步,在flink的流处理程序里边,首先是source读取数据源,接下来那就是transform要做转换计算了啊,所以其实在flink处理的流程里边,Transform这一步应该是最重要的,对吧?大家想这是主主体的这个做计算啊,做做转换,那那显然都是以这个transform为准的嘛,啊,那所以接下来我们看一看transform里边可以有哪些转换算子可以可以进行处理啊,首先首先要给大家讲的呢。呃,这些有时候经常按按照这个做一个划分啊,大家看前面三个map flat map filter,这些算子呢,往往会统一叫做基本转换算子,或者叫简单转换算子。啊,因为因为大家想到了之前我们讲那个任务之间的数据传输的时候,他们这这是不是相当于跟下游的那个数据传输,就相当于只是一个one to one这样一个窄依赖的这种传输方式啊,对吧,它不会影响到他们,呃,就是当前分区的数据的顺序啊,啊它就是来一个处理一个,来一个处理一个,没有任何额外的操作,所以大家会看到啊,最简单的map,那就是比方说我当前就是把这个形状做一个改变,对吧?方块进来,圆圈出去啊,来一个走一个,来一个走一个,非常简单啊,然后Fla map的话,大家知道是打散对不对啊,就是把它这个做一个拆分,呃,然后可以生成多个啊,就是来一个数据可以输出多条数据啊,这是Fla mapb的特点,那filter呢,它的特点是。
01:38
来一个数据,按照某种过滤条件做一个筛选过滤是有可能不输出,也有可能输出,对吧?哎,它是这样的一个特点,那所以接下来我们在代码里边做一个简单实现。还是啊,新建一个。新建一个包,我们这个com.at at硅谷点,呃,当前还是API test,现在我们应该是到了这个transform下边,对吧。
02:10
Transform。当前我们是第一个,呃,测试啊,TEST1。当前我们主要是基本转换算子的一个测试啊。啊,那首先还是把这个main方法定义出来,上面我们要有这个抛出异常的这个操作啊,里边的话先创建流逝的执行环境。我把它叫做env对吧?啊,另外我们为了方便看到按照顺序输出的结果,我可以把全局的并行度设成一,这个都没有问题,呃,接下来是不是应该要读取数,呃,读取数据啊,对吧,这个就是我们还是像之前一样,我们做测试的话,从文件里面读取数据吧。这个我就不详细写了,我直接抄之前的这个从文件读取数据完事了,对吧?呃,这个data stream啊,从这里直接读过来。
03:08
我们把这个data streamam引入,有了这个数据之后,那这里边我先给它叫做一个input stream,对吧,我先叫叫一个这个名,然后接下来我们可以基于它做各种各样的转换操作了,首先哎,我们做这个基本转换啊,第一个是map对吧,比方说map的话,呃,这个这里边读进来的这个数据啊,其实。比较简单,就是逗号分割的一串字段嘛,那所以我这里边可以直接比方说啊,我要求就是把这里边的每一条数据。读进来的每一行数据转换成它的字符长度,转换成一个整数,做一个输出,大家想想可以不可以,可以对吧?啊,我们把。String转换成长度输出啊,所以接下来这里边我可以直接得到的data stream啊,那大家想一下这个map操作数据类型可以变吗?就这data stream的那个泛型可以改变吗?当然可以改变,对不对?Map要转换嘛,对吧?前面我们那个图也看得很清楚,方块进来,圆圈输输出对吧?里边的这个数据类型可以改变,那我们现在得到的想得到的那就是一个对吧,一个整形啊,所以比方说我把这个叫做一个map stream啊,基于前面的input stream直接做一个map转换,这map转换里边大家看到map需要传的是一个什么东西,对,这里边传的是一个map方式。
04:42
然后map function,大家看它里边其实是有两个泛型的,对吧,那这个TR分别指代什么呢?哦,大家要注意T在这个,这当前我们调的方法都是在data stream这个,呃。当前的这个文件这个类里边对吧,都是在data里边啊,那data stream我们直接看到前面来,这里边是不是本身有一个泛型啊,当前的data泛型就是T啊,所以那你看这里边我input stream.map那当前的这个T应该是什么?
05:16
是不是谁调map方法,那就是哪个stream的那个泛型啊,所以是不是当前的T应该是stream啊对,所以这里边啊,我直接去new一个map function,哎,大家看我这个写法,这这相当于是什么,我是不是直接对定义了一个匿名类啊,对吧?呃直接就是相当于呃直接实现啊,这样一个接口在这里边用一个匿名类实现就完事了,大家看自动补全之后,它是不是T就是string啊,那R是什么呢?R当然就是我们这里边输出的这个数据类型,对吧?你看最后它得到的是一个single output stream operator。哎,那说这又是个啥东西呢?在字面上翻译,这是一个单个的输出流的这样的一个算子,对吧?Operator,诶,那我们再再点进去大家看哦,它原来是不是也是一个data stream呀,本身它也继承自data stream,对吧?啊,所以本质上是不是得到的还是一个数据流啊,所以这个其实没什么区别啊,啊,那这里边大家就看到了,最终得到的这个数据流的类型就变成了R,所以map本身是可以有这样一个转换操作的,然后接下来大家看这个map里边必须要实现的方法是一个。
06:33
就是这样的一个map方法对吧?啊,重写这样一个map方法,大家看一下这个map方法参数是一个string类型的value,这是不是就是输入啊,对吧?就之前这里这个流里边输入的数据是什么样子,然后输出是不是返回一个integer啊哎,所以大家会想到这里边是不是直接就是return,就是你把那个string做转换之后,是不是输出就完事了啊,所以我们现在要它长度怎么办?是不是value.l是不是直接输出就完事了。
07:05
啊,就是这么简单是吧?啊,这就是利用这个map啊做一个基本转换。好,我们先把它放在这儿,然后第二步,那呃,那我们可以看一下这个flat map flat map其实大家更加熟悉,这就是前面我们做那个word count的时候,是不是第一步就是要把那个数据读进来之后分词啊,对吧?呃,按照空格分词嘛,那现在我们这个数据呢,它不是空格分词了,大家想我们要分的话应该怎么样。哎,是不是按照逗号去做一个分词啊,啊,所以接下来我们这个是按逗号逗号。呃分呃,切分字段对吧,其实是切分字段,所以这里边我得到的data stream,诶那大家想一下,最终我这个那就看你定义要输出什么东西了,对吧?比方说我简单一点就是把每一个字段啊,我也不做区分,直接就把这个对应的字段string输出就完事了,那我得到的是不是还是一个string啊对,那大家想这个Fla mapb可以改它的那个数据类型吗?
08:06
可以,对,当然可以,对吧,大家还记得之前我们那窝count是不是就是一行那个string输入,最后我们输出是不是都变成那个二元组了呀,对吧,其实这个完全是可以的啊,我们这里边其实就是给大家再做一个简单的转换而已啊,我们把这个定义成叫做flat map stream,那可以基于之前的input stream做一个flat map操作,那这里面要new的话就是一个,哎,对,大家自然想到了就是一个flat map function对吧?啊,然后这里我们看到它的同样这里也有两个对应的这个泛型to,哎,大家看到这是不是也是输入输出啊,对吧?啊,所以这里边就是输入是string,这是因为input stream stream是string啊,那它的输出是因为这里边我们想得到的是一个string对吧,那这里边我们看到它就没有返回值类型了,实现的这个Fla map方法。没有返回式类型,那用什么来输出呢?哎,对,这同样这是输入,那另外还有第二个参数是一个collect,那是不是用al.collect来输出啊,啊,所以接下来我们这个操作也非常简单,我先把它做一个切分,那得到的这个string类型的数组,比方说我把这个叫做FS啊字段那就是value.split按照对逗号来做一个切分。
09:28
那最后是不是for,对每一个字段field的从这个fields里边拿出来,接下来是不是都做一个这个一行的话,我不用写画括号了,对吧,直接输出out.collect然后把这个field输出是不是就完事了啊,所以这个其实也是非常简单的一个操作啊,比我们那个world com还简单,那个还包装了一个二元组呢,是吧,那我们这里边就直接输出了。呃,另外最后还有一个这个filter filter的话大家都知道是过滤,那这个过滤其实就是做一个条件的筛选,对吧,比方说我们要筛选,呃,Sensor sensor1开头的。
10:16
的数据对吧,就是ID啊筛选呃。341开头的ID对应的数据,那所以接下来我们得到一个这个stream,哎,那这里面大家要想一下,当前这个filter可以改变数据类型吗。哎,大想它它只做过滤对吧,那是不是应该不不做这个数据类型转换啊,哎,所以大家会注意啊,Filter这里边类型不能变,就只能是stream,这个没什么商量的对吧?啊,那所以这里边filter stream啊,定义一个基于input stream做一个filter,大家看这里面我要实现的就是一个filter function对不对,呃,你有一个filter。
11:02
哎,写错了啊,这个是不是flat map对吧?Filter啊,这里面你有一个filter function,大家看区别就在于filter function是不是只有一个泛型,就是T啊,对吧,就是之前是什么类型,现在还是什么类型,这个不能改,所以接下来。我们看一下这个具体的实现,要有一个filter方法,它返回输入的参数是当前的数据,对吧,资讯类型的数据,那返回值一个不玩类型,也就是说如果要是返回一个false就代表。哎,这个数据是不是就不要了,过滤掉了对吧?如果返回处的话,哎,就相当于筛选出来我要对吧?哎,所以这个其实就是一个条件筛选嘛,那这里边我们要求以SEN1开头,那应该怎么办呢?哎,是不是就是value点,哎这个大家想到我是不是应该要去做一个starts with啊对吧?因为我当前这个value是不是就是那一行数据,我也不用单单独提取对不对,那它就是前面啊开头到底是什么,这里是不是就是什么呀?对吧?我就看它这个开头是这个三四十一开头不就完了吗?对吧?啊,这个我也不用单独去提取它字段啊,所以我就直接starts with sensor1,这个starts with是不是本身返回的就是一个布尔类型啊啊,所以接下来就得到了我们想要的这个对应的这个字段啊,这就是简单的一些基本转换操作啊,啊,那这里边我们还可以做打印输出。
12:36
啊,打印输出的话,这里边大家看不同的这个流是不是分别都可以打印啊,那这个叫map对吧,Map操作,然后另外还有这个flat map,把它print出来,这个叫flat map,还有这个filter stream也可以打印出来,这个是filter,最后不要忘记还有一个env execute执行起来对吧?哎,这是我们当前的这个呃,流流失处理程序必须要实现的东西,好,那接下来我们运行一下看看结果。
13:20
好,那那接接下来大家看一下这个输出的结果,当前哎,每一个这个map输出的大家看就只是一个整数,对不对,这就是当前那个字符的长度,你看我们当前的这个数据。第一个第二个这个都一样,24对吧?第三个是不是这个数据稍微短了一点,所以是23,最后一个长了一点,是25没有问题对吧?然后我们看这个flat map的输出flat map,我们是把每一个字段切开了,大家看是不是这样,你看341,呃时间戳温度值对吧?346温度值呃,时间戳温度值每一个都切开了,然后另外还有这个filter filter,大家注意一下这个输出的,哎,我们要三一开头,那是不是只有341呢?哎,对,大家看到有341是不是还有三四十啊,哎,因为三四十是不是也是以三四下划线一开头啊啊,所以这个其实并没有区分对吧?你如果就要选三一的话,那是不是你其实应该要,就是我们是不是相当于要把之前的那个字段要做一个提取,然后提取出来的第一个字段一定要严格意义上等于EQUALS341才可以啊,哎,所以这这个就大家能想到啊,如果要严格。
14:36
相等的话,你必须这样去做,所以这就是我们这个基本转换算子得到的结果。
我来说两句