00:00
已经用flink的data set API实现了一个批处理的work count,那与之对应,如果要是这个文本数据是一条一条不停的来的话,那我们就需要做一个流处理了啊,那当然对应流处理的话,需要用的就是flink当中的data stream媒PI来实现,那对于弗Li而言,我们知道流处理才是最核心的处理逻辑,所以真正批流统一之后,Data税媒PI是非常非常强大的,这其实是我们之后学习的一个重点和核心,我们知道对于批处理而言,也只要写一套data three没PI的程序,最终。运行提交的时候去指定一个执行的模式为batch就可以了啊,那所以理论上来讲,Data API就是以后我们的通用接口。所有的批处理、流处理都可以用它来统一实现。啊,那对于流处理而言,那接下来我们就要给大家讲两种不同的流了,我们知道对于流数据而言,流处理当然是得心应手,那对于批处理而言,其实是把它当做有界流来进行处理的,比如说我们如果要是读取文件,其实同样也可以用流的方式来对它进行一个转换计算输出的。啊,接下来我们就来给大家讲一讲流处理word count读取文件是一个怎么样的操作和效果。
01:33
好,我们在代码里边新建一个。Plus叫做out。同样,我们还是先把main方法写出来,我们会想到它区分成两种,一种是有介流的what count,那另外一种就是真正意义上的流数据的word count,所以为了区分的话,我们还是把它做一个更改。
02:02
Rename一下。我们把它叫做。Bounded,这个就是有界的啊,有界限的stream workout,那接下来在这个代码里边大家就会想到了,这个流程其实跟批处理word count的程序非常的类似,这里第一步我们还是要先来创建一个执行环境,那区别在于。我们这里不能直接去使用之前的execution environment,因为大家会发现这一个执行环境后边如果直接读取数据的话,得到的就是一个data source,这个data source1成字operator,最后继承字set,那不就又变成了data set API了吗?哎,所以我们会发现这个根源其实就是执行环境不同。那这里我们既然是流处理,那么要创建的执行环境也要有所区别,所以创建的是一个流式的新环境。
03:06
这个流式执行环境类名就叫做stream execution environment,我们看到当前这个跟之前不同。之前我们是在。抓API flink API Java下边有一个execution environment,而现在我们引入的是flink streaming API environment下边的stream execution environment,好,那它的方法调用当然还是一样的,直接execution environment。还是把它定义成叫做env,接下来的第二步当然就是读取文件。啊,那么目前这读取文件的话,自然想到了方法一样,Read the text file,我们还是把input下边的words.txt读取进来。我们可以看一下当前读取进来之后的这个东西。
04:00
其实它变成了一个datatream s。而之前我们用。普通的执行环境啊,去调取read text file的时候,得到的是一个data source啊,那么它本身最后继承字data set,那这里面得到的呢,是一个data stream source,我们自然就想到了他最后继承字,我们看它继承的是output stream operator,然后最后继承。Data stream啊,接下来我们要做的处理当然就都是基于data stream去进行转换了啊,所以这一套API就叫做data stream API,那这个我们就叫做line。It dream do。有了这个source之后啊,接下来就要对它进行转换吧。转换一段我们就可以简单写了,跟之前的过程其实是非常类似的。
05:02
直接调一个flat map操作,哎,那这里边。其实我们就想到了自然还是同样的步骤嘛,一个。Line之前的一行数据啊,然后还是通过一个collector,这里我们引入的collect collectorlink YouTube下边的collector啊,那同样我们得到的这个结果应该是什么呢?还是一个link定义好的二元组T2类型里面还有泛型string LA啊,其实整个的这个处理过程跟我们在N边NP处理的过程里边定义的是一样的啊,目前的这一个收集器collector包括。Out,然后接下来那自然就是。表达式的一个具体实现。把对应的。把元组类型引入进来啊,接下来里边的处理过程跟前面是完全一样。首先。
06:04
单词拆分出来啊,那就是I去报一个speed,方法当然是以空格作为一个拆分了啊,那后边对于每一个word。Word里面的每一个word,我们其实都要把它包成一个二元组,去做这样的一个输出啊,那所以这里边当然就是给一个out。二二。What?过程跟之前完全一样,当然了,对应的这个限制也是一样的,因为有泛型擦除,所以使用了拉姆表达。拉姆达表达式之后,我们在这里需要加一个returns。把对应的类型定义出来。这里的类型给的是types Apple,然后types string,第一个元素和第二个元素type钠。
07:06
就是我们处理的过程,哎,那这里边我们得到其实就是word and one元组,其实我们仔细看的话,可能就是这里边得道的返回类型跟之前有所不同,具体的处理流程其实完全一样。啊,那这里得到的是什么呢?这个得到的是一个叫做single output stream operator。就是单一的输出的流算子啊,那对应我们之前的那个算子而言,其实我们都会想到它最终得到的都是一个data stream啊,或是基于data stream这样一个集合类型去做了数据的转换操作而已。所以这个过程有了前面的data set API的调用,现在就非常非常简单了。接下来是第四步啊,那我们要做一个分组操作了,分组其实也非常简单,那就是world and one temple,直接去前我们是做了一个group by,哎,那我们现在会发现这里好像没有group by方法了,那怎么办呢?
08:12
Stream里边有一个方法叫K,起到的效果是一样的啊K。那就更加的明显,也就是说我们是要指定K啊,本质上还是要指定K去做分组嘛啊,所以这里边就是把K当前对应的那个索引填在这里就可以了,我们这里比方说还是以一个索引位置啊,那也就是零这个索引放在这里,那么我们就可以指定是以word进行一个分组操作,指定为K,但这里大家看到这个,如果这种用这种方式呢,KY这个其实画了一个。这种方式要被弃用了。对于data stream而言,他更加推荐的方法是什么呢?哎,更加推荐的方法其实是我们看到里边传一个key select啊,在这里我们可以简单的看一下这个KBY啊,它有各种各样不同的调用方式,K为推荐的方式就是里边传一个k select key select当然是link单独定义的一个类型了啊,那我们在具体实现的过程当中呢,也可以有非常简单的。
09:18
实验方法,那就是还一个拉姆达表达式,其实更加的更加的简单,更加的直白,比方说这里边我们要提取当前的第一个字段提取出来,那这个怎么去指定呢?这个字段到底叫什么名呢?对于二元组而言啊,弗link定义的这个二元组而言,我们看到它有两个属性,一个叫F0,一个叫F1,所以它的第一个元素就叫F0。第二个元素就叫F1,那我们这里如果想这么去定义的话。非常简单,那就是啊。当前得塔F0的元素提取出来作为当前的K返回就可以了。
10:00
哎,这个其实过程还是非常直白,也非常非常容易理解的。啊,那定到这个结果我们看到是一个这个不再是前我们看到那个unsolted grouping类似的那个类型的定义了,它就叫做k street。啊,那当前我们可以把它叫做world and one itre,那对于这个k stream,它又是个什么东西呢?其实点进去的话会发现它更加的简单,哎,这里得到的同样还是继承字data stream啊,那所以这里边我们做转换,转来转去还是date stream而已。当前我们通通都叫做data stream API啊,那就名副其实啊,所有的操作都是基于data stream的,以后当然就是要做求和了,做一个按照当前K分组之后的聚合操作,这个聚合操作的话,之前我们就是直接做了一个萨,那当前呢也是直接萨。
11:02
Am后面可以直接给一个索引位置啊,那所以这里边直接给一个一,这个没有被弃用,直接这么用是完全没有问题的。啊,那就是我们最后得到的大结果,那大家可以看到这一步得到的也是single output stream operator,跟前面我们做转换计算是一样的啊,所以data three API比data sum API data set API呢,看起来就会更加的统一一些,每一步转换计算除了这里边分组的时候这个不太一样啊,得到的是一个k stream,别的转换计算其实得到的都是一个single output stream operator。那得到这个结果之后,我们就知道最后。可以做一个打印输出啊,当前的sum去做一个print直接输出就可以了。那这里大家需要注意的是,这里好像并不需要前面再去throws exception了,但是我们会发现当前如果直接要去这么执行的话,其实是没有结果的,为什么呢?啊,因为对于流处理而言,并不是你把这个流程定义完了之后,读取数据就直接能执行完的。因为对于刘处理而言,我们其实。
12:18
只是当前的这个操作,我们读取了一个有界的文件。这个数据是有有限的,所有的数据都能读进来,那实际运行的时候,流处理其实默认数据应该是无界的,源源不断会到来的,那怎么办呢?我们我们就不能直接把它定义这打印,就把所有内容都打印出来就完了,而是应该不断的等待新的数据输入,每来一条数据去执行一次整个的处理流程。啊,所以他最后会需要有一部。启动执行的这样一个操作。一个操作叫NV。
13:02
也就是说目前我们定义的这个操作啊,定义到这儿的时候,这只是定义了每一步的执行流程,并不是说数据都读进来就可以到这儿全部输出结果了。如果真正的流数据来了之后啊,那这里面其实还没完呢,还需要继续等待,那怎么办呢?我们相当于是要把这个程序啊,比方说作为一个线程的话,我们是要把它直接挂在这里运行,然后不停的监听,等待新的数据的到来。那这个方法调用之后,我们会发现这报错了,哎,所以大家看到这个through exception是在这儿抛出异常,那前面我们还是不能省哎。那方法后边分上一个的exception,这样的话就定义好了。
我来说两句