00:00
大家给大家实现的是一个批处理的word count,哎,那我们会发现这里边主要用到的数据结构是data set,然后是基于这个data set在做各种转换啊,所以大家就想到了,那这这个是数据集啊,这并不是我们所说的数据流啊,所以这个data set API啊,我们做这个批处理主要是针对离线数据啊,离线数据集来做的这样一个计算,那自然我们就想到,那如果要实时数据呢,数据不停的来,源源不断的来,那应该怎么做呢?当然就应该用。流处理API对吧?哎,那用的就不是data set了,而是data stream啊,这就是之前我们说的这个核心的API啊,啊是叫data stream API,所以接下来我们来新建一个class,这个要跟前面做区分的话,我们叫做stream stream workout。好把这个先创建出来,那同样还是这个main方法啊,那我们想到后边也是需要去throw一个exception。
01:04
呃,然后接下来呢,第一步还是创建执行环境,但是现在大家要注意,刚才我们的那个创建执行环境,这个创建的是一个。批处理的执行环境,那如果要是流处理执行环境呢,哎,就必须要有所不同了,流处理执行环境刚才是execution environment,那现在这个叫做。Stream execution environment,其实差不多是吧,前面加了一个流而已啊,加了一个stream而已,然后点get execution environment啊,所以这个直接一创建的话,我把这个叫做EV对吧,前面这个也是类型,就是一个stream execution environment。啊,这个就是先把它创建出来,然后同样接下来我们还是啊,呃,那我还是先从这个文件里面去做一个读取吧,跟刚才这个应该就类似对吧,那所以我直接可以把这个copy一下啊。
02:04
直接copy在这儿,然后这里大家会发现我现在其实是不要这个data set了,对吧?大家会想到我现在如果要是直接得到这个结果的话,应该是个什么东西呢?直接read type file应该是什么东西呢?大家看到这里面其实是个date stream source,对吧?那么这个data stream source又是个什么玩意儿呢?它是一个single output stream operator还是一个operator啊,就类似于我们前面说的那个啊,算子啊这样的一个东西,它是一个单一输出的流流式的这样一个算子。那么它又是什么呢?好,大家看它的本质还是一个data stream流对吧?所以接下来我们要做的操作,那就都是基于这个data stream在这里边要调的操作了,对吧?哎,大家看看有这个flat map filter什么的啊,啊,所以我们所说的这个最核心的API就是data stream API。
03:04
好,我把这个先关掉,那接下来我们就在这里。呃,我这个不写data data source也是可以的,我直接写data stream是不是也是一样啊,对吧,这个是没没有任何问题的啊,那这里面我就不要叫这个data set了,而是把它叫做data stream啊,这就是我们当前对于这个数据的读取,那然后接下来是不是同样可以做对应的这个转换操作啊,对吧?呃,就是我们还是啊。基于基于数据流进行转换计算,那这个计算流程也是完全一样,我们思路还是啊,来了一个数据,那么我先一行数据,这是我先把它打散,直接分开对吧?呃,分成那个一个一个的词,然后再把它map成一个词一个一,一个词一个一,这样一个二元组,最后是不是按照词去做分组,然后统计那个一就可以了,上那个一就完事了啊,所以大家看一下接下来我这个东西怎么去做操作啊,那就是input this shift,哎,大家看现在我要去做对应的这个操作,是不是先要做一个Fla map呀,还是同样一个Fla map,那这里边要传的是什么呢?
04:21
其实大家会看到啊,这里面传的是不是还是一个flat map function啊,这个flat map function大家会发现跟刚才的那个是不是一模一样。没什么区别对吧,因为大家看到它其实在flink API common functions里边的啊,所以这是通用的啊,啊就是不管是这个批处理流处理这里边调的这个fly function其实是一样。所以这里边我干脆就可以偷懒了,我可以直接怎么写啊,对,我是不是直接去new一个对应的那个MY啊,Fla map是不是就可以了呀,因为我刚当前当前的这个处理的操作完全一样嘛,都是之前我们做的那个是什么,就在这里边先按照空格分词切开,然后接下来是不是每一个word都包成一个WORD1这样一个二元组输出啊,那我们现在不是一样的操作吗?
05:11
啊,所以直接我就把这个就引用过来了啊,直接就就拗这个就完事,然后下一步下一步怎么做,是不是开始按照这个word分组啊,Group by,诶但是大家发现了没有group by。API不一样了,对,前面大家看到是不是它有一个操作叫做k buy啊,啊,这里边大家看到流处理啊,Data stream API里边要对于这一个数据做分组的时候,它用的就不是group by,而是K。那大家能想到当前的这个原因是什么吧?其实从字面上也非常好理解,就是group的话,是不是要对于数据做分组啊,那前提是不是你这个数据应该是一批都到了之后,我才给你划分分组啊,那我现在数据都没到,你分什么组啊,所以他现在是说什么K指的是。
06:05
指定K对不对,就按照K去做不同的划分啊,所以后边我们会给大家提到啊,这个KY它的含义其实是什么呢?是按照当前key的哈希code去对数据进行一个重分区的传输操作,类似于是要做一个重分区了。也就是说经过KY之后,下一步操作啊,它到底分到哪一个分区里面去,那是跟它的K有关的。对吧?啊,这个就是这样的一个操作流程,所以接下来我们这个代码很简单,是不是KBY0,是不是按照这个字段就可以了,然后下边就是SUM1,是不是这样就完事了,哎,这就是我们当前做这个计算的这个过程啊,啊,那当然我可以把它这个先定义出来,然后大家看到这里边定义的这个这个叫做sum啊,我可以还是把它叫做result string,这个是流了啊,那那为什么说这个还是一个流呢?因为大家看这里边本身它自动推断出来的这个类型叫single output stream operator,那如果我们点进去的话,大家会发现它还是一个。
07:14
Data stream对吧,还是一个数据流啊,所以说我们现在就是数据流转换操作了啊,就是都是data stream,所以如果你要这里直接把它写成这个也是没毛病的啊呃,我们就针对的都是data stream在做转换啊,在做调用啊,那最后自然就是result stream是不是可以直接print做一个打印输出,好,那大家要注意一下,现在还没完。就是你如果要是直接这么执行的话,你会发现他这个没任何效果。不会有任何的输出,为什么呢?对,因为大家会想到我们当前是流数据对吧,流逝的数据是不是应该是来一个处理一个事件触发呀。所以其实跟我们之前这个批处理是不一样的,批处理的话,就按照我们这个处理流程的定义,那是不是你把这个诶走到这儿了,那把所有数据读进来对吧,然后后边你做Fla map,哎,那所有数据就就就全做一个Fla map,后边group by,那就分组嘛,然后some就统计出来,打印就打印,输出就是按照这个一步一步做完的,但我们现在其实不是。
08:20
我们现在其实应该是什么呢?是我提前这个任务,先把每一步操作都定义好,任务都分配在这儿,然后任务提起来,是不是接下来是等数据啊,等数据一个一个来,一个一个输出,这才是流数据的架构,对吧?所以你这里边如果直接在这儿print的话,这个其实只是把任务定义出来而已,他并没有执行这个任务。那什么样的操作才叫执行起来任务呢?下面要做一个这个啊执行任务这是要调一个env,它下面的一个方法叫做。Execute啊,就是用这个方法,就相当于是我把当前的这个所有任务先全启动起来了,对吧?前面我只是在定义这个任务的每一个操作步骤而已,这才是真正的启动,接下来事件出发来一个数据就输出一次。
09:17
好,这是这一部分代码的实现,那我们来运行一下,看看这个结果吧。先看看这个效果怎么样。好,现在已经输出结果,大家看一下当前的这个结果是什么样子呢?诶,当前大家看到还是啊,就是一个word一个count,一个word一个count对吧,最多的大家看到这个哈是四,呃呃不是呃对哈是四个对吧,然后这个另外还有一个那个UU是三个,这个大家是有印象的,这个没问题对吧?那跟之前不同的一点是,我这里边每一个词,每一个word只有一次输出吗?不是的,大家看这个HELLO4之前是不是还有一个HELLO3啊,更之前是不是还有HELLO2啊,在之前还有HELLO1对吧?那那为什么呢。
10:12
为什么转眼一对,因为大家想到当前是流式数据,那是不是我当前的这个,尽管我读的是文件对吧?文件里边的每一个数据来了之后,是不是相当于我也是来一个数据就处理一次啊,所以是不是我是先先有了一个哈,然后再有两个,再有三个,再有四个啊,这中间是不是相当于我保存了一个它的状态,在之前那个状态基础上,是不是挨个叠加上上来的,对吧?所以大家就看到这就是有状态的流式计算啊,就是这样的一个处理过程,那另外大家可以可以还看到另外一个小的细节点啊,就是前面还有一个数字,然后加一个这个像大于号一样的一个一个一个小标,这个是表示什么意思呢?这里是表示什么意思呢?
11:01
4312对吧,大家看啊,一共好像就就都是12344个数,那这表示什么含义呢?这其实所说的就是,哎,我们之前不是说过这个当前是一个大数据的处理环境吗?我们需要去做一个分布式的计算,那大家想现像现在我在本地环境直接起的话,没有办法分布式,但其实应该可以模拟一个什么,是不是相当于可以模拟一个多线程啊,所以大家会发现当前这其实代前面的这个数字啊,就代表了我们当前并行执行的线程的编号,大家也可以认为就是我们真实执行的过程当中,它对应的那个分区的编号,对吧?啊,就大家可以认为这是它那个并行子任务的一个分区,那这里面为什么只有这个12344个呢?哦,那是因为我这里边默认的并行度,这里边有一个概念叫做并行度,默认的并行度是四,因为我当前的电脑是四核,对它这里边有一个默认并行度,就是按照这是开发环境里边啊,开发环境下默认的并行度就是按照当前的核心数量来定,那我能不能做设置呢?哎,可以的啊,大家看我直接可以在这做一个设置,我直接env可以设置set parallelism,这个parallelism parallel大家知道是并行的意思,对吧?啊,并行并发啊,Parallelism就是并行度的意思,所以我可以设,设大一点,我设一个八。
12:39
但接下来再去运行的话,你就会发现。当前那那呃,大家会想到它还是1234,就只局限在四个数里边吗。那就不会了,对吧?啊,它就可以有更多的取值,那当然它有还是有局限,就是只局限在一到八的范围内,对不对,哎,大家看到这就这就是并行度是八啊,就相当于八个线程执行了。
13:03
啊,那当然你也可以就是全局直接把这个并行度设成一,设成一的话,这个最终输出的效果那就是。啊,大家就想到如果设成一,那是不是相当于就没有并行啊,就单独的一个线程执行,那最后是不是就所有的都在同一个线程输出就完事了,大家看前面就没这个东西了是吧?啊这就跟我们前面那个批处理输出的那个结果就非常像了啊啊那当然这里边还是有略有一点点不同,就是因为它还是是不是来一个就输出一次,大家看hello,一哈二哈三对吧,而之前我们提出理是。最后是不是就只输出一个呀,对吧,只有一个唯一的结果输出,这就是最大的一个区别啊,就是批处理跟流处理之间的差别。
我来说两句