00:00
接下来我们就在WC这个包下边要去创建一个Java class来处理当前的。批处理workout,我们把它叫做h workout。做批处理,那首先我们应该有批量的这个数据才行,我们这里直接就把所有的文字啊文本写到一个文件里好了啊,那这个文件呢,我们就直接在项目的根目录下去创建一个input文件夹啊,然后对应的这个文件就写在一个叫做words XT这样一个文本文件里面,那这里面其实就是可以模拟一篇文章啊,那其实我们想到就是一行一行的数据嘛,啊这里我们就简单写,比方说就是hello word。打一个招呼啊,那另外还可以link。当然也可以哈,爸爸。这么简单的三行数据,我们要统计一下里边每个单词出现的频次是多少。
01:00
好,有了数据之后,接下来当然就是看一看代码里边到底该怎么写了。在h workout这个类里边,我们首先敲出准备法,然后在里边。就要有具体的处理流程了,定义具体的处理流程了,对于一个flink程序而言,它并不是直接上来之后我们读文件就可以了,而是首先第一步要创建一个执行环境啊,因为其实我们会想到flink的执行,它是需要有一个非常复杂的集群的,它整个是要配合起来去做工作的,当然不可能我们一个代码里边定义好了处理流程,它直接就一步一步执行,就就直接搞定了,这个是不可能的,所以我们首先要创建执行环境。创建执行环境的方式,那需要啊,是盗用一个叫做execution environment,我们看到它这波是在flink API Java下边的execution environment,这导包的时候大家也需要注意,就是特别是如果后边还要写这个跟skyla混写的一些代码的时候,对于弗link而言,它有一些类名,在Java和skyla里边类名是完全一样的,那么我们在导的时候,到底导入Java下的还是SKY下的,这个就需要特别的小心,当然了,当前这个我们只有这样一个啊,我们只引入了这个Java相关的API,所以这里边不会导错,就是execution environment,我们要调用的是它的一个静态方法叫做。
02:37
At execution environment得到的就是一个execution environment执行环境的一个对象,我们把它直接简单的命名叫做烟威。有了这个环境,新环境之后。第二步,我们就可以从文件中读取数据了。那么这里边我们读取数据的时候,其实调用NV下边的一个read file方法,可以从文本文件里边读取数据,那读取出来这里边需要传入一个文件的路径啊,我们当前就是。
03:15
项目的根目下边input words.txt,把它写过来,得到的应该是什么呢?可以看到它得到的是一个source。一个数据源,也就是说当前的执行环境,从外部的文本一个文件当中读取数据得到的,就是我们接下来要去做批处理的一个数据的源头,数据源。那这个数据源,我们看到它其实还有一个泛型。也就是说它是一个数据源,那么这个数据源里边的数据本身又是什么具体的类型呢?它是string字符串啊,文本类型,所以接下来我们其实要处理的就是。哎,我们知道读取文文件肯定是一行一行逐行读取嘛,其实就是一行一行的字符串啊,那所以这里我们可以把这个data source重新命名一下,我们就把它叫做一行一行吧,Line source。
04:14
或者我们直接简简写叫line DS也是一样的。三步有了数据源之后,接下来我们就要考虑一下具体的处理逻辑了,我们想要做word count的这样一个操作的话,那其实主要就是要把每一行里边的。每个单词要提取出来,按照这个提取的过程其实非常简单,就是按照空格去做一个切分,然后得到的应该是一个字符串的数组,那数组里面的每一个元素其实就是一个单词。那后续我们应该怎么样把他们分组,然后去统计个数呢?这里有一个非常简单的想法,就是可以把它转换成一个二元组类型啊,就是比方说我们前面有一个哈。
05:07
后边有一个它的个数,比方说一来一个哈,那么就是一个哈,一相当于就是一个word,一个count,构成这样一个二元组,后续我们其实就可以以前边的这个单词作为分组的依据,那么同样的单词归到一组,把后边的个数做一个叠加统计就可以了。这是一个基本的想法啊,所以接下来的第三步,我们要做的其实就是。Young。每行。数据。定型。恩驰。把每个单词提取出来,然后转换成二元组类型。这个转换的过程就是我们就可以想象应该用一个什么样的方法来做转换呢。
06:01
呃。在很多大数据处理领域里边,我们都有一个方法叫做map啊,这就是通用的一个转换方法,那对于map而言,它其实只是对于单词,呃,对于每一条数据一对一的转化,那我们这里边是要把一行数据拆成很多个单词,再把每一个单词转换成一个二元组,那其实这里边会有一个把。一条数据拆分打散成多条数据,类似于我们把一个集合类型,然后扁平化打散的一个过程,那自然我们就会想到了在大数据处理领域里边的一个操作叫做flat map啊,这个操作就是做一个扁平映射,可以把数据做转换,而且可以打散啊。接下来我们要做的就是基于line source去调用一个map方法。莱曼方法里边我们可以看到它要传的其实是一个莱曼function啊,我们要传的是一个函数,因为要做映射,要做转换,自然你是要告诉当前的程序,当前的代码,我们要做的是一个什么样的转换,这个转换其实可以用一个函数来表示。
07:14
那么对于JAVA8而言,它是支持LA的表达式的,在这里我们就直接用这个新特性写一个LA的表达式来做一个定义,那需要有两个参数了,当前的输入参数一个就是。当前这一行数据啊LA,然后另外我们还需要一个参数,这个参数呢,是用来做转换之后输出这个操作的。那么对于弗link而言,Fla map这个操作,它的输出并不是说直接定义出来的,我们返回什么就是什么,而是要通过一个叫做lecture。也就是所谓的收集器来进行处理。需要注意一下,我们要引入的collector应该是阿帕奇link u下边的collector。
08:04
一个收集器啊,那简单来讲就可以理解成是我们要把。最后在这个流里边啊,要向下游传送的,传输输出的那些数据。把它是先放到了一个收集器里边,先收到一个盒子里边,然后一条一条把它发送出去的啊,类似于这样的一个解释啊,那这里边我们要复集器,它也需要有泛型,那泛型就是反换输出得到的那个数据类型到底是什么,这里就写什么类型。那我们要得到的是一个二元组类型,所以这里边其实就应该是一个二元组了。我们知道对于盖拉语言而言,它本身内部就有元组类型,而对于Java呢,并没有定义元组类型,那怎么办呢?呃,在link里边,它给我们定义了元组类型,我们看到link API Java下边专门有一个temple这个包,然后下面就有各种各样的元组,我们现在要用的是二元组,那么就是TA2。
09:08
后边我们看到有两个泛型,当然就是二元组里边每一个元素的类型啊,我们现在是一个word一个count嘛,所以前面word是一个string的话,我们可以给一个完整形均了,我们把它定义出来。就是第二个参数类型是个collector,当然了,我们可以把它叫一个名字,就叫做。Out,它定义在这里,那接下来要写的是一个拉的表达式,所以。前面我们这里TEMP2也需要做一个引入,引入大家需要注意不要引入scla里边的元组类型,我们要引的是ink提供的抓板元组类型。好,接下来就是具体的实现了。里面具体实现啊,那当然首先是要做一个。
10:00
词先得到一个string类型的数组,先得到words。这个分词就是把line做一个切分,It按照空格来做一个分词。得到对应的每每一个单词都得到之后,接下来当然就是对于每一个单词。对于words中的每一个word。要把它转换成。一个二元组TEMP2这样一个类型,然后输出,那我们这里边输出的应该是它后面的这个计数应该是几呢?其实非常简单,那就是来一个,我们就后面给一个一不就可以了吗?那就是来一个hello,那就是HELLO1,来一个word就是WORD1,那最后我们就把后边每一个单词,它后面有多少个一全部叠加起来,就可以得到它的总数了。所以接下来我们这里输出也非常简单,就是得到。二元组做输出,输出的话是要用out到一个collect的方法,就是收集器的一个用法啊,在这里直接out.collect表示类似于我们当前的这一个操作,要要输出,要返回对应的这样一条数据。
11:15
呃,我们可能会想到,那为什么这里要用这样一个alt,不直接返回一个值就作为输出不就完事了吗?这里边的好处就在于我们用方法调用的方式,你想输出多条可以输出多条。我们可以看到在一个for循环里边,每调用一次out.collect就相当于输出了一个数据。所以如果是flat map这样一个扁平映射的话,当然必须要用这种方式才能够把它打散,做多条输出。如果我们只是最后类似于return一个值的话,那只能返回一个,就不能起到一对多的打散的效果了啊,所以这也是扁平映射必须要求的一个设计里面,当然需要的就是一个二元组做二啊,这里我们要要构建这样一个二元组的实例的话,例和它的方法里边当然就是一个word,一个count count的话我们直接给一个一就可以了。
12:13
哎,长整形的话就是一。就是进行类型转换,包装成二元组的一个过程。看起来稍微的有一点复杂,但事实上这里面的每一步都是有具体的含义,而且非常容易理解的啊。所以上面这一步。做的是。当。一行文本并行,因此进行拆分,然后拆分开之后。当每个单词。反换成二元组输出。就是具体的两步操作。
13:00
呃,这里边需要注意的一点是,如果,呃,因为大家知道在Java里边,它原生并不是支持函数式编程的这种表达,拉姆达表达式是JAVA8引入的一个新特性。那所以对于。弗林而言,他如果是把这个在这个处理的过程当中传入了一个拉姆达表达式的话进行编译之后,就会。就会出现泛型擦除的这种效果啊,所以这里边我们遇到的这些泛型可能最终啊,我们这里边得到这个输出类型是一个string long包装起来的一个二元组,那最后可能就只知道它是一个二元组,并不知道里边的类型是什么了。那为了解决这样一个泛型擦除的问题,在flink里边需要在后边加一个类型的分门的声明。声明的方式也非常简单,就是直接用一个returns方法,一个return方法里边直接给定当前的X啊,那当然了,当前是一个元组类型,所以是X up,然后里边指定两个每两个位置的元素啊,二元素,每一个元素的类型,一个是string,另外一个是长整了。
14:19
就是我们具体这个定义的过程啊,那当然了,这一步操作结束之后,得到的我们会看到啊,得到的是一个flat map operator啊,这个operator我们可以理解成一个运算符,也可以理解成可以翻译成算子啊,就是他得到的这样一个运算的一个算子,相当于就是把这个数据源做了转换,得到了一个新的数据集合,我们看到它这里边也有对应的泛型定义,那这个泛型其实就是。既然是做转换嘛,那就是输入的数据类型和输出的数据类型都列举到了这里啊,那这里边我们可以把它不要叫做returns啊,给一个更好理解的名字,因为我们知道最后是一个一个word,一个一这样的二元组,我们干脆把它叫做。
15:10
World and what?把它叫做这样一个二元组就可以了。是第三步操作。
我来说两句