00:00
接下来要做的就很简单了,当然就是按照当前每一个word去进行分组了。按照word进行分组。Word and one Apple,然后直接分组的话,调用的方法叫做啊,我们直接调用这个Fla map operator下面的方法啊,Group by里边group by的话,报传入的非常简单,我们可以直接传一个索引位置,当前不是本身是元组吗?哎,那元组的话,每一个位置都应该对应着一个索引,比方说我们现在要按照第一个元元素去进行分组,把它当做K,哎,我们知道在Spark里边直接就有group by k啊这样的操作的,在flink里面没有,那怎么办呢?它的K到底是哪个呢?是可以用。当前的索引位置去单独指定的第一个位置的话,当然就给一个索引零。
01:03
就表示。啊,以前面我们的第一个元素word作为分组的K。那这个分组之后,我们看到得到的是一个unsolted grouping,一个数据类型啊,这个这个我们可以直接把它叫做word and one。Group。到分组之后,这还不是目的,我们把它分组之后,其实最终是要做统计的,这个统计也非常简单,那就每一组,哎,我们知道它本来是二元组嘛,每一组里边那前面的这个K啊,Word都已经一样了,后边就是很多个一,我们其实就把这个一做一个叠加就可以了。所以。那就是分组内进行。聚合同比累加一共有有多少个一啊,所以我们这里要做的就是word than one group直接掉一个方法some啊,那sum这里边我们也得指定看到,也需要指定一个field当前字段的索引,也就是说二元组里边你到底以哪个字段去做求和的操作呢?哎,当然就是第二个字段了,所以就应该是一。
02:26
要注意的是,这个一并不是我们看到前面给来给给到的啊,Word后面跟上的那个一具体的那个数字,而是一个索引,指的是我们要把第二个位置的值做一个上,做一个叠加。啊,最后得到的就是这样的一个sum,我们看到它其实是一个aggregate operator,也就是说这是一个聚合算子。也是做了一个转换操作,得到了这样一个结果。那最后的大已经得到之后,我们可以做一个结果啊,打印输出。
03:04
脏它的一个print方法,那这里边我们直接重用的方法的时候,会看到直接报错了,这是为什么呢?哎,那是因为你就会发现啊,它是要有这个出异常的,所如果我们在外边去调用,你不处理异常的话,当然就会报错了啊,那简单起见的话,我们可以直接在这个main方法后边直接跟上throw exception啊,这样的话就可以调用了。我们直接来运行一下,看看结果是否符合我们的预期。执行之后。我们可以看到直接输出了这样的几个二元组,Flink有一次出现了一次,所以是FLINK1,那么word也是出现一次。Java也是出现一次,而hello这个单词出现了三次,这就是我们最后统计输出的结果啊,这就做了一个批处理的,我统计。
04:04
那关于这部分代码呢,我们只是做了一个简单的实现,但其实里边已经涉及到了flink底层的很多比较复杂的知识,这里需要给大家解释的强调的一点是,我们在调用的过程当中,每一步转换是。让当前的艾吉尔帮我们自动的做了。类型的推断啊,然后直接把这个就直接就就所有的做了一个自动补全,那这里边每一步操作,像这个data source,还有后面的Fla map operator,还有aggregate operator这些东西到底是什么呢?我们可以点进去看一眼the source,它本身其实就是一个operator。哎,那么operator。本身哦,继承字,这是一个抽象类,本身又继承自data set,哎,所以这其实就是一开始我们讲到的flink里边API的种类,Flink里边它是分层API嘛,中间核心层其实就是所谓的data stream data set API,这里就是做批处理时候的基本的数据结构,叫做数据集data set这一套API,其实所谓的这个operator啊,算子或者叫运算符,它的本质底层其实都是一个数据集data set。
05:28
啊,所以这一套API就叫做data set API。那对于弗link而言,在早期的版本里边啊,我们其实每一步转换操作看到啊,得到的这个operator,我们看到这是一个single input udf operator啊,然后那当然了,往后看single input operator,最终S可以继承自operator,最后再回到set来啊,那后边我们看到的这个aggregate operator其实也是一样,最后不是回到了data set,所以所有的转换操作都是基于data set在不停的转换,我们就把调用的这些这一套API叫做data set API。
06:12
啊,那对于以前的flink版本而言,这套方法这就是做批处理的一个标准方法了啊,那当然了,如果要做流处理,那怎么办呢?那其实我们知道那要调用的就是一套data stream API了啊,非常类似,但是呢,整体架构就都不一样,底层的数据结构就不再是data set,而是data stream了啊,那我们会发现这其实就变成了。各做各的两套班子,整体来讲不够统一,不够方便。从flink01:12开始,官方推荐的方法就不再使用这样的data set API去做这个操作了。那用什么方法呢?批流统一,只要统一用data税API,就同时可以实现批处理和流处理。
07:00
那么呃,这里只是为了方便大家理解,所以我们还是把最经典的data set API的调用再完整的给大家梳理了一遍,那对于这个01:13的版本,其实data set API已经处于了软器用的一个状态,可以想到在不远的未来,Setpi应该就会被完全弃用啊,完全就没有这个必要了。那如果说我们在。使用data stream API去进行批处理操作的时候,那又应该怎么办呢?啊,这个书里边给大家写的非常的明确。这种方式也非常的简单,那就是代码都是一样,都是直接用data three API写就可以了,那最终只是在提交任务的时候稍微做一个区别,这个区别就是flink wrong,提交任务后面加一个参数,这个参数就是运行时的runtime mode,也就是运行时的模式,执行模式让它等于batch。
08:03
这样的话,我们提交上来的这一个作业,它就是以批处理的方式去执行的,否则默认情况下就是以流处理的方式运行的。好,这就是我们关于批处理word count的一个讲解,那对于这set API的话,我们只是这里边作为一个引子引入一下,因为已经要被弃用,所以在后续的讲解,后续的代码里边,我们就不再去涉及到set了。
我来说两句