00:00
我们已经实现了一个批处理的world cup啊,在这个过程当中我们会发现啊,这里的每一步数据的转换、计算、调用的方法,其实都是基于前面我们得到的data set这样一个数据类型,所以整个这套API我们就叫做data set API啊,那data set呢,顾名思义它是数据集,所以很显然它是把所有的数据先收集齐了,作为批量数据进行处理的,很显然这就是批处理的一个标准模式。前面我们也提到了,对于flink而言,它的底层本质上其实还是流处理啊,对于这样的一个批量数据呢,它其实是当做一个有界的流数据来进行处理转换,所以我们就会发现啊,那就没有必要使用这样一个data set再单独的进行一层包装啊,我们只要使用数据流,把它当做有界流不也是一样的吗?所以。我们说flink从01:12之后真正的实现了批流统一,接下来我们就不再用data set API去进行代码开发了啊,所以这个只是我们了解一下就可以,之后我们统一使用的是什么呢?就是传说中的data stream API,我们把所有的数据都是当做数据流来进行处理转换。
01:19
好,那我们自然就想到了data stream数据流,很显然,它本身设计的时候就是为了处理流式数据而设计的。流逝数据是什么样的呢?啊,之前我们也介绍了,流逝数据其实就是一个一个数据源源不断的来,不停的来,来一个就立刻处理一个,这样的话,我们整个处理的实时性就会非常的好。所以对于这样的流式数据,那我们用flink处理出来的效果又是什么样的呢?接下来我们可以写一段流式处理代码,使用data stream来进行word count的一个处理转换。那当然了,平常我们在工作当中遇到的实际的流处理的问题呢,又有两种具体的情况,一种就是说,哎,比如说我们的这个数据流,所有的数据来的它是有镜头的,也就是说我们当前这个流流到一定程度的时候,就直接会截止,接下来就没有东西了,诶那我们就会想到既然有截止。
02:19
连续不断的来,到一定时候直接就结束了,那这相当于不就是我们所说的有界流吗?所以这是一种情况,就是针对有介流的处理。当然了,另外还有一种情况,就是源源不断,无穷无尽。没有结束的时间,那这种情况就叫做无戒律。所以针对这两种情况。Link底层都是来一个处理一个,但是处理的情形可能就又会有所不同啊啊,那针对这个有界流的数据呢,我们也可以直接读取一个文件啊,比如说像前面我们不是把它所有的数据文本放在了一个TXT文本文件里边吗?接下来呢,我们就可以试图还是去读取word.txt里边的数据。
03:10
只不过之前我们是当做一个数据集,当做一批数据整体读进来的,那现在呢,现在我们就一行一行的读取嘛,读一行就作为一条数据直接进行处理,读一行就进行一次处理,这就成了一个标准的流处理模式了啊,当然我们会讲到这个处理过程跟之前的批处理是非常类似的啊呃,接下来我们就在代码里面具体的实现一下。那同样我们还是在当前的包下边去新建一个skyla的object啊,现在我们是做一个有界流的。流处理啊,那么我们就把它叫做bond。Stream worldout。把这个object先创建出来,然后接下来首先我们有main函数放在这里,接下来呢,首先第一步跟批处理是一样的,可以说是完全一样,首先还是要先创建一个执行环境。之前我们在。
04:08
批处理的时候创建的执行环境就是使用了execution environment里边的get execution environment这个方法,调用了这个方法,然后创建出了执行环境,那现在流处理的时候呢,稍微有点不同,我们要创建一个。流逝。执行环境。那这个过程当中我们调用的是stream execution environment。调用它的get execution environment的方法。所以我们会发现啊,之前是没有前面这个stream啊,现在我们加上这个。前面这个包啊,也会有所不同,之前我们这里直接就是fli api.scale现在呢是flink streaming API点啊,所以相当于我们现在使用的都是stream这个包下边的内容了。当然了,后边我们还会涉及到影视转换相关的一些问题,所以在这里啊,我们一样的啊,直接把最后的这一部分改成下划线,后边的话那就不用再去单独引入了,这个非常的简单。
05:08
接下来第二步呢,当然就还是读取文本文件数据。基于因为去调用一个text file,方法跟前面完全一样啊,那这里还是要传入一个当前的文件路径input下边的。words.txt。得到的这个我们同样可以给它一个名字,我们就叫做lie data,之前是data set,那现在的类型又变成什么了呢?啊,其实我们会发现前面基于的环境不一样,得到的当然也就不一样啊,现在得到的是一个data stream啊,那所以干脆啊,我们就把这个叫做。我们就直接把它叫做line data stream。接下来的处理流程跟之前的批处理其实是完全类似的,我们的处理过程其实是一样,还是读入一行数据,然后就把当前的所有单词先拆分开,按照空格拆分开,然后呢,包装成一个word,一个一这样的二元组结构。接下来呢,以前面的第一个字段作为K进行分组,然后把第二个字段叠加sum,所以这个过程我们直接可以照抄之前的批处理这个过程我干脆就把这里的内容先全部复制过来。
06:28
我们看看接下来应该怎么去做,哎,那首先我们就想到了,那接下来这个应该是。Line data stream,然后去调一个Fla map方法啊,然后我们看到这里调用的方法呢,其实是一样的啊,直接传入一个拉姆达表达式,表示把当前的每一行按照空格进行分词,单词都分割开,后面呢,哎,同样可以做一个map转换,直接把每一个单词转换成word和一这样的一个二元组形式,所以我们看这个API调用啊,Data stream和data set也是完全类似,都一样,直接在这里Fla map,然后map转换就可以了。好,那接下来第四步呢,同样我们要进行一个单词分组,但是发现这里边没有goodbye了。
07:11
Goodbye这个方法不存在,诶,那怎么办呢?哎,没关系,在当前的state threepi里边,它有另外一个单独的分组方法,叫做。叫做KBY,哎,就是以什么为K,然后指定当前的分组,哎,所以我们可以直接KBY0,哎,但是大家会发现啊,这里它相当于这个方法要被弃用了,画了一横杠,哎,所以呢,更推荐的方法是什么呢?我们看到啊,这里边他推荐的方法是直接给一个k select。给一个当前键的选择器,哎,那这个选择器又怎么实现呢?看起来更加复杂了呀,哎,这里边有一个非常简单的写法,可以直接使用一个拉姆达表达式来提取当前的K,比如说我们当前是二元组里边的第一个元素嘛,那我就把第一个元素作为返回值返回就可以,哎,怎么写呢?哎,那其实我们知道就是当前的data塔。
08:06
然后要返回贝塔的下划线一,就表示当前二元组的第一个元素啊,当然了,这还有更简单的写法,我们可以直接用下划线做一个简写,哎,所以这个其实看的很明显啊,就是要选取当前二元组的第一个位置的元素作为当前的K进行分组。然后后边直接S1,这还是同样是针对当前二元组的第二个位置的元素做一个聚合统计求和就可以了啊,最后做一个打印输出,所以整个这个过程跟前面批处理可以说是完全一样,非常简单啊啊那当然了,如果说我们直接这样做完了之后要做一个运行的话,我们可以先测试一下。运行一下,看一看能不能得到对应的结果。我们可以看到,这里没有任何的输出,也没有任何的报错。这是为什么呢?其实我们仔细想,回忆一下批处理和流处理的不同,就会发现问题,我们会想到之前批处理的时候,诶,它其实就是我们按照这个定义的流程,把当前的数据全部读进来,然后进行转换,进行处理,最后打印输出就可以了,因为当前的数据都已经收集齐了嘛,一次全部都拿到,然后进行转换,分组聚合,没有任何的问题,但现在呢?
09:26
现在我们是一个流数据,尽管我们还是从文件里边啊,看起来是有界的这个数据,但其实不是一下都读取出来的,我们是一个一个读取,读一条,然后后边就按照这个流程要处理一条,后边就应该要打印一条,现在我们打印完了之后,当前处理就结束了吗?显然没有,他还要继续再去读下一条数据,假如我们当前的这个流失数据不是读取的文件,而是另外一个真正意义上的无休无止像水流一样啊,不停来的一个数据源的话,那我们就会发现啊,当前其实并不是打印输出就能结束的,我们应该一直等在这里。
10:04
所以对于data媒PI,它的执行方式啊,这个程序架构跟批处理就又会有所不同,最后我们一定要再来一步。这一步我们可以说就是要执行当前的任务。调用什么方法呢?Env有一个execute这样一个方法,执行的方法,哎,表示我们当前这个任务要执行起来,开始怎么样等待数据的流入,来一个就处理一个。这就是我们当前流式处理跟批示处理最大的一个不同啊,现在我们加上这一句之后再来运行一下,看一看结果。我们可以很明显的看到现在就正常输出的结果,但是现在呢,输出的结果跟之前又略有不同,哎,之前我们还记得啊,批注里这里就直接有几个单词就输出几个结果吧,Scla wordlink他们都只出现了一次,那就只写一个SCLA1WORD1LINK1就可以了,Hello出现了三次,那就是HELLO3。
11:07
但是现在我们会发现哈喽不一样啊,别的只出现一次的都是一样的,但是哈喽出现了三次,所以我们会发现它是什么呢?它是每来一个哈就会输出一次哈的统计结果,哎,这就是我们说的来一个处理一次,所以前面第一句话,假如说我们已经做了这个呃,Hello word的统计的话,那么它就会输出一个HELLO1,那如果来了hello flink呢,他又又会HELLO2,接下来还会哈三。这里我们又会发现一些其他的问题,就是比如说我们本身这里处理的数据,诶,它应该是第一条数据是hello word,但为什么先输出的是skyla呢?啊,Scla很明显这里是我们最后一条输入的数据嘛,它本应当放在最后才统计出来,但是现在呢,我们直接前面就统计出来,这是一个,另外呢,我们看到跟前面批处理的结果不一样,它不仅仅只有一个结果的统计元组,二元组这样的一个结果,还有前面的一个数字。
12:10
这又是表示什么含义呢?这个数字看起来没有什么规律啊,呃,但是对于哈这样一个单独的词而言,它统计输出的时候,HELLO1 hello2 hello3,前面的数字都是一样的,这到底表示什么呢?其实我们已经可以做一个简单的猜测,就是我们想到对于弗林克而言,它是一个大数据的处理引擎,所以真正在运行的时候,它显然不可能是我们这样一个单线程的程序,本地运行完了就完了。其实我们现在在集成开发环境里边啊,在做模拟测试的时候,其实它也会有一个哎,类似于多线程并行运行的模拟,这就是在模拟我们的一个集群环境的并行执行。那这里边的数字呢,我们就可以理解成,哎,相当于是不同的线程在并行执行嘛,所以这可以认为是处理任务的时候的一个并行的编号啊,那既然有了这个概念,那我们就知道了,后边既然并行执行,那显然那就没准了啊,我好几个并行的任务都在处理当前的这一个词频的统计,那有可能hello scalela读进来之后,诶,它处理的反而快一点啊,并行处理的时后,他就先输出了,那其他的单词在处理的时候呢,并行处理比它还稍微慢了一点,所以就在后边输出。
13:31
这样一来的话,我们也就对这个结果有一些基本的认识了,所以通过这个例子,我们也可以非常直观的看到流处理和批处理的区别啊,这就是关于有界流处理实现workout这个需求的过程。
我来说两句