00:00
我们现在已经把整个项目的骨架搭建好了,接下来就可以真正的上手写代码,实现我们所说的word count这样一个词频统计的需求了。啊,我们知道对于大数据领域而言啊,Workout就是最简单的一种数据处理,可以说就跟hello word一样,但是对于数据处理而言呢,其实又可以分成两类,之前我们说过。按照不同的处理思路,处理引擎就可以分为批处理。和流处理。所以我们现在要做的这个word count呢,词频统计也有两种方式,一种方式就是所有的数据先收集,收集齐了之后攒一批,然后进行处理,统计到底每个词出现了多少次。另外还有一个呢,就是流处理。来一个词,我就来进行叠加,当前这个词到底出现了多少次,来一个就处理一次,来一个就处理一次,这是两种完全不同的处理思路,那我们说flink本身。
01:01
是一个流处理的计算引擎,那它能不能进行批处理呢?其实也是可以的,之前我们在对比flink和Spark的时候就已经说过,他们两者基本的数据处理的世界观是不一样的,但是呢?呃,其实他们都可以用自己的世界观去处理别的场景,比如说flink,它可以使用流的思路去处理批数据,也就是说把来了一批攒一批的数据呢,我看成一个有界的流。当前这一部分是截止的,到这里有范围的一个流数据,那我就把它当成一批来进行处理就可以了。所以接下来呢,我们可以先看一看弗link怎么样去处理这种非常经典非常典型的批处理场景,批处理其实整体来讲是比较简单的一个场景啊,我们会想到首先应该需要有一个成批的数据已经放在这里了,我们一批把它读进来,然后统计就可以了,那这一批数据呢,我们这里就简单的做一个模拟,我们就直接创建一个文本文档,然后里边写入一些单词,统计它出现的频次就可以了啊,那这里边我们简单起见,直接在当前项目的根目录下去创建一个叫做input的文件夹,然后在下边呢,我们可以创建一个文本文件。
02:19
一个,我们就把它叫做words.txt。里面我们就可以写入一些我们想要统计的文字啊,啊,比如说我们就简单几件,就hello word吧,最初的一个测试案例啊,那另外我们还可以hello flink。再声招呼啊,另外我们还可以hello。我们可以按照自己的想法,随便的写一些单词进去,进行词频统计好,那接下来呢,我们当然就是应该在scla这个目录下边去创建对应的源代码,然后去进行测试就可以了啊,那这里呢,为了方便管理,后面我们有很多章节,很多代码,所以呢,我们按照章节的编号来创建不同的包,对代码进行一个管理,所以我们这里边呢,可以去new一个package。
03:02
啊,当前我们还是加上这个反写的域名吧,直接com.at硅谷点啊,那接下来我们就不用写那么多了啊,直接CHAPTER02,当前是第二章的内容,我们创建一个这样的包,然后接下来呢,在下边就可以去new一个skyla class测试的源代码,这里我们需要注意一下,就当前创建的skyla类的类型到底是什么样的呢?是一个普通的skyla类吗?呃,其实不是,我们当前类似于是一个Java的测试类,我们直接应该能够把它去运行输出得到的结果,哎,那所以这里面我们其实要创建的是一个object,是一个单例对象。好,那当前呢,我们要测试的是一个批处理的work count,我们就把它叫做batch workout就可以了。好,把它先创建出来,然后在这个单利对象的内部,当然最重要的就是一个main方法了,哎,我们直接把它放出来,在这个main方法里边,就是我们具体要实现的逻辑,它是可以直接执行运行的一个方法。
04:05
那接下来就是具体执行的过程了,对于一个link程序而言啊,它的第一步,首先我们是要创建一个执行环境。那什么叫做执行环境呢?啊,其实简单来讲就是对于flink代码,我们知道它并不像最简单的Java或者SC拉代码那样,按照顺序一步一步往下执行就完了,因为我们知道link是一个大数据处理引擎,它的代码到时候执情行的时候应该是一个怎么样的执行情况呢?很显然它是一个分布式的集群环境。他可能要分发到很多个节点上去并行的执行,所以这个时候我们当然就不能按照平常的这种方式,按照顺序一步一步走了,所以必须要先获取一个当前运行的上下文环境啊,所以这个其实简单来看的话,我们可以跟Spark对比,就是类似于Spark context它的上下文,接下来我们这个执行环境叫什么呢?呃,其实就叫做execution environment,我们引入的时候注意是叫flink。
05:10
api.sc下边的execution environment,把这个执行环境引入,然后我们获取的时候呢,就直接调这个类的get execution environment方法就可以了,接下来我们可以直接给一个变量名称,就叫做env,先把它获取到。然后有了这样一个执行环境之后,下一步当然就是要去读取文本文件数据。读取数据,那这个读取数据的过程呢,就要调用en nv这个执行环境,它里边有对应的方法,这个方法就叫做我们可以看到有read file可以读取文件,那现在呢,我们既然是一个文本文件吧,直接read text file就可以了。里边呢,就要给当前的文件路径,我们当前是放在根目录下边的input里边的一个word.txt。
06:04
直接把它指定好。然后得到的这个东西呢,也应该给他一个变量的名称,因为我们读取文本文件的时候,是一行一行读取的啊,所以这里我们可以直接把这个就叫做。Line data啊,当然了,这里我们可以看一下当前它的类型到底是什么,可以看一眼,我们会看到它读进来的其实是一个data set,是一个数据集啊,那所以这里边我们就会知道了,当前我把这个文本数据啊读取出来之后,接下来就是要作为一个数据集去进行集中的按照批次进行处理了。那这里面我们可以把它写成line data set。好,把它定义好了之后,接下来我们就是基于这个数据集data set去进行各种各样的转换。所以接下来我们第三步就是对。数据集进行转换处理。
07:02
那我们现在要做word count进行词频统计,统计的逻辑是什么样的呢?哎,那简单来讲的话,其实就是按照不同的词先要做一个分组嘛,啊,不同的词我们分成不同的组之后统计每个组里边的数量就可以了,这里边有一个比较简单也是比较经典的做法,那就是我们可以先把这里边的每一行先打散拆开,拆成每个单词之后呢,诶,每一个word不要直接就把它分组,分到一个组里边,因为接下来我们要统计它的个数嘛,我可以怎么办呢,直接把它。包装成一个二元组的形式,就是一个word,当前一个单词后边就跟上它的目前的频次就是一当前只有一个嘛,所以接下来呢,我们就可以按照这个二元组的第一个位置的这一个单词作为K作为键。
08:00
进行一个group by啊,进行一个分组啊,我们在Spark里面也有对应的group by k这样的操作啊,我们把它做分组之后呢,接下来要统计的是什么?那其实就是把后边的第二个字段做一个叠加,做一个sum操作就可以了,哎,所以这样的一个简单聚合就实现了一个文本内容的分词统计,所以接下来我们要做的这个转换啊,首先。是要把这一个每一行数据要打散,那这个打散呢,我们知道在很多大数据处理应用里边,有一个叫做flatten的操作。就是所谓的扁平化处理的操作,打散之后呢,我们还想把它再转换成一个一个word,一个一这样的一个二元组形式,所以接下来呢,还要做一个map转换。那我们知道这两个操作合在一起,有一个统一的操作就叫做。Flat map。所以在这个flink提供的API里边啊,也有对应的方法,调用非常简单,可以直接针对当前的data set,我们就直接line data set,可以直接调用flat map这样一个方法。
09:09
那里边要传什么呢?诶,我们看到里边这个给的提示啊,传的方式,这里边其实最简单的一种形式,其实我们看到有这样的一箭头,这就是拉姆达表达式嘛,所以当前每一个flat map啊,或者map这样的一个操作,里边要传入的其实就是一个方法。这个方法就代表了我们要针对当前数据集里边的每一条数据进行什么样的转换。那我们现在要做的转换是什么呢?呃,首先要做的其实就是把它打散,所以我们干脆啊,直接就使用空格做一个分词,做一个SP就可以了。这种方式呢,在SC里边有简写,我们可以直接用下划线来表示这样的一个拉姆达表达式啊,就是如果我们当前的行参在后面只出现一次的话啊,直接用下划线来表示,那后面直接SP调用这个方法,后面我们需要有一个空格,把它做一个分词。
10:03
当然了,本来按照我们的想法,应该打散之后还要把它在包装成一个二元组,做这样一个转换,本身我们是可以在flat map里边做一次性的转换的,哎,但这里呢,因为我们使用这个split啊,这样写会方便一点,那我们就把它分成两步吧,就前面先做一个单词的拆分,按照空格做一个分词拆开,然后接下来呢,再来一个map,把每一个单词转换成一个word,一个一这样的二元组不就完了吗?哎,所以接下来当然就是直接再去调一个map转换就可以了。所以里边我们这就是每一个word要把它转换成一个二元组,我们知道scla里边的二元组,这个不用单独去处理类型啊,直接用一个小括号括起来就可以,那就是一个word,一个一,所以这个其实非常的简单,使用scla做这个数据集数据类型的转换啊,集合类型的转换其实非常方便,这里我们看到呃,报错啊,这里边做报的这个错是什么意思呢?主要这里边涉及到就是skyla里边有很多的影视类型转换,我们这里边如果要是直接这么写的话,它没有引入对应的这个影视转换的相关的类,这个时候就不能直接做对应的操作,那这个时候怎么办呢?哎,当然我们就要去引入相应的影视转换了,哎,所以这里边我们可以inport inlicit这样的一些对应的转换啊,这里边我们可以看到有flink API scalela下边的create type information啊,我们把这个引入。
11:33
接下来我们看啊,把这个引入之后,接下来这里边就不再报错了,但这个可能稍微有一点麻烦,那假如说这个艾idea尔没有做对应的自动提醒,我们这里边又怎么样记住对应的这个东西呢?啊,如果大家觉得有点麻烦的话,这里边有一个简单的写法,我们看到前面的执行环境execution environment,以及这里的create type,它其实都是在flink API下边的啊,那接下来呢,我们干脆有一种方法是什么呢?直接在这里写一个下划线。
12:05
然后把。这个包下边的所有内容都引入啊,接下来的影视转换当然也就不成问题了,所以这是一个比较简单的写法啊,往往我们会看到这个SC拉代码里边,上边的引入是这样去做引入。好,我们现在已经把数据单词进行了拆分,而且转换成二元组,那接下来当然就是进行分组和聚合的过程了。所以接下来就是按照。单词进行分组。那这个分组的过程呢,其实也非常的简单啊,那需要我们把上面这个这一步操作得到的结果也要保存成一个变量啊,那我们可以看一下当前的类型是什么呢?我们看到还是一个data set啊,就它还是一个数据集。只不过现在数据集类型变成了一个二元组,哎,那所以干脆啊,我们这个就把它叫做word and one,就是一个word,一个one,这样的一个二元组啊,这样的一个数据集,那接下来我们要做分组的时候,当然就是基于word and one,然后去做一个哎,Group by,我们看到有一个方法直接就叫做group,然后里边呢,可以传入一个位置,指定当前的K是什么,那我们现在。
13:20
给的位置就是零,零就表示当前二元组里边的第零个位置,那当然其实就是第一个元素了,当前就是我们以word作为当前的K。进行分组,那得到的东西是什么呢?那应该就是做了一个分组之后的word and one了。我们就叫做world and one group吧。接下来最后一步,那就是真正意义上的统计。对分组数据。进行上聚合统计。所以接下来我们就应该是word and one group,然后去直接调用,我们看到有一个some方法直接去做统计,后面呢,啊,又跟着是一个当前统计字段的索引位置啊,那这里边我们统计的是谁呢?当然是当前二元组里边的第二个字段,前面我们group by0,这指的是第一个字段,现在当然就是上一,就是第二个字段。
14:20
这里需要注意的是,这里的一并不是指的是我们这里的这个WORD1的数字,一,它指的是索引位置,就前面的word,它的索引位置是零,二元素的第一个元素,而后面的一呢,索引位置是一,它是代表二元组的第二个元素,哎,所以当前我们是每一个word后面跟着都是一,那假如后面是二是十是100都无所谓,我们都可以按照后边的第二个字段把它做一个累加聚合。所以最后得到的这个结果呢?呃,当然我们就可以把它叫做result,或者叫做sum,得到最后的结果,最终就可以做一个打印输出。最后一步打印输出,我们可以看到sum可以直接调用一个print的方法,这样的话就得到了我们想要的词频统计的结果,所以我们看到整个啊,调用SCAPI做这个批处理的work count还是非常简单的,接下来我们可以运行一下。
15:16
看看得到的结果是不是符合预期。啊,我们看当前已经运行起来了,看一下统计输出,哎,我们可以看到啊,当前统计输出的就是这样的四个元组,SCLA1 link1 word1,这是三个单词,在我们这里边只出现了一次,而hello这个单词呢,出现了三次,所以它直接输出的是HELLO3,这跟我们预期的结果是完全一致的。这就是使用flink去进行批处理workout统计的这样的一个过程。在这个调用的过程当中,其实大家仔细的话就会发现啊,我们这里边的每一次调用自动补全它的数据类型的时候会发现啊,其实就是得到的是一个data set是一个数据集,所以整个这一套API的调用呢,在flink里边又把它叫做。
16:09
Data set API。它都是基于data set去进行的各种各样的转换。后边我们做的这个各种各样的操作啊,Flat map map,后边做这个group,我们其实都是基于data set这样一个数据类型的。那对于这样一个data set API呢?可以这样去调用,但是现在已经越来越不推荐大家去使用它,这是为什么呢?主要就是因为我们说对于弗link而言,它本身是一个流批一体的处理架构,本质上来讲,我们对于这种批量的数据集进行处理的时候,底层也是数据流,所以根本没有必要用单独的data set API去进行特别的处理啊,所以从flink1.12开始。官方推荐的做法呢,就不需要再去调用data set API了,而是统一使用数据流的API,就是我们所说的data stream API啊,分层架构里面的中间这一层。那么在提交任务的时候,如果说我们现在是一个批处理的word count这样一个任务,那又怎么样去做区分呢?跟这个流处理怎么去做区别呢?
17:17
只要在提交的时候做一个单独的设定,将当前任务的执行模式设置为batch就可以了啊,具体来说的话,那就是实际在提交的时候,后面我们会讲到将要使用一个叫做flink run的命令去提交当前的一个任务,一个作业,那么后面呢,只要跟上一个对应的配置项,就是runtime mode,当前的运行时执行模式等于batch,这样就可以了。这样的话就代表即使里边我们使用的API还是data API看起来就像一个流处理程序一样,但是是事实上最后我们执行的时候是按照一批数据去执行的。这样的话我们就会发现啊,这个data set API其实就没什么用了啊,之后我们就不会再用data set这样一个数据类型去处理批数据了,所以可以说flink从1.12开始,真正的已经实现了完整的流批一体的这样的一个概念啊,那这里只是为了方便大家理解啊,所以我们还是使用这种经典的data set API给大家做了一个批处理的实现,如果我们在实际应用的过程当中发现了类似的代码的话,也不要慌啊,他只是用了这个老版本的data set而已。
18:31
之后我们在工作当中直接使用同一套data vpi就可以既可以处理批数据,也可以处理流逝数据。
我来说两句