00:00
我们可以回忆一下之前讲过的what com程序,这个程序里边我们可以看得非常清楚,开始就是创建了一个流逝的执行环境,然后接下来呢,就是基于这个执行环境去读取当前的数据源,得到的data stream source这样一个数据类型,之前我们也已经具体的跟到源码里面看看过,它其实本质上就是一个data stream,而接下来所做的所有的操作其实都是基于这个data stream调用的API。啊,这就是我们所谓的转换操作,后边的一连串转换操作之后,最后要有一个输出,这里输出很简单了,就是直接打印到控制台,所以这里的点print就是我们所谓的一个think,就是数据的输出操作。最后还有一步不要忘记的就是基于环境要有一个调用它的XQ的方法去启动执行,这就是一个完整的flink流式处理程序的整体结构。有了这一步,有了这些概念之后,那接下来我们就可以再详细的去看一看。
01:12
里边的每一步又可以扩展出哪些丰富的内容?首先第一部分当然就是执行环境了,这一部分比较简单啊,之前我们看到在代码里边其实直接就是dream execution environment,然后get execution environment就可以了,那这一部分其实如果我们细想的话,其实可以发现它是有问题的,为什么呢?因为对于一个flink程序而言,它在执行的过程当中,我们说并不是就是当前我们这个代码写的这样啊,就像平常我们直接执行当前的这个没方法的时候,按照顺序一步一步执行,然后执行我们下边的每一步操作就可以了。真正要运行的时候,我们知道所有的这些操作是要拆分成一个一个不同的任务,然后交给不同的task manager,在不同的slot上面去执行的。
02:07
那么对于这样的一个过程,我们在idea里边如果直接写了这一段代码的话,在这里直接运行的时候,它其实是在基于本地的GVM去模拟了一个集群环境。啊,那这当然跟我们实际生产运行的时候,提交到远程的集群环境里边执行肯定是不一样的,那这里边我们直接做这个获取。当前的执行环境获取到的是什么样的呢?啊,它本质上其实是应该有区别的。所以如果要更加严谨的去写的话,获取执行环境其实我们应该有不同的获取方式,整体来讲,具体来说有以下三种,最简单的一种就是get execution environment,这就是我们代码里边直接用的这种方式,它的特点是。可以直接根据当前运行上下文得到正确的结果,也就是说如果我们现在是在人开发环境里边直接去模拟的运行的,那么这里得到的就是本地的一个执行环境。
03:13
而如果我们把当前的这个代码进行打包,然后提交到了远程的集群环境上面执行,那这个时候执行这段代码的时候,Get的时候,Get获取到的就是一个远程的集群环境啊,所以这个就相当于做了一个自适应。帮我们搞定了判断的这个过程,所以这也是最为简单,最为容易容容易记忆和容易掌握的一种方式啊,不需要考虑太多具体的细节,这也是我们在代码当中最常见的方式,那它的底层其实应该是什么呢?啊,那其实是剩下的另外两种方式,就一种是。Create local environment。那这个其实跟前面的get execution environment是一样的,我们看到都是stream基于stream execution environment调用的一个动态方法,前面有这个study关键字。
04:08
所以除了直接get environment之外。所以我们可以看到还可以create。Local environment。利达直接创建出来的就是一个本地的执行环境。我们可以看到它还可以,呃,这里有这个,呃,可以传入一个参数,就是传入一个当前的并行度,那当然了,如果说不把并行度传入的话也可以。这里边就直接用的就是默认的本地的并行度啊,那我们知道默认情况下就是本地机器的CPU核心数量了啊,这是跟自己的机器配置有关的,之前我们在测试的过程当中也发现了这一点,这是本地的执行环境,那如果要是远程交到远程的集群环境里边,那又应该是什么样子呢?啊,那对应的就会有一个。
05:04
Create remote environment,它的参数稍微会多一点,因为我们要指定远程集群环境到底是什么样的,这里面主要的参数有,首先是一个job manager的主机名host name,然后第二个是job manager的端口号,开放出来的那个端口号啊,那其实这个我们知道,一般情况我们这里边用到的端口号是6123,然后另外还有一个是我们提交给job manager要去执行的那个抓包的名称。啊,这里面我们会看到并没有涉及到task manager相关的信息,因为job manager就是我们整个集群的管理者嘛,它整当前这个整个作业的这个管理者嘛,所以只要把他的信息告诉了,呃,提供提供给当前的这个代码,那么我们其实就可以把它提交到正确的集群上去了。所以整个这个过程当中,我们会发现,如果在idea里边直接执行,它其实应该调用的是local environment create,一个local environment。而如果。
06:12
我们把它打包之后提交到集群环境,那调用的应该是create remote environment啊,那当然了,如果说我们直接在代码里边这样去写的话,那这是不方便的,这相当于我们在开发集成开发环境里边,IDE里边去测试的时候需要去写成local,而在提交的时候呢,要更改代码,把它改成remote。这显然是不合理的,所以flink给我们把他们就包在一起,直接用一个get execution environment去做一个底层的智能判断啊,所以我们在代码里边也非常简单,直接这么调就可以了。这是关于创建执行环境的基本方式啊,那这里边我们会发现。我们讲到的主要就是stream execution environment,然后去调用这样的一个方法,那当然这样创建出来的获取到的就是一个流式的执行环境,后边再去获取数据源,当然得到的也就是data stream了。啊我们既然这里主要讲的是data stream,那这个当然是没有问题的,那如果说我们想要去处理批示数据,又该怎么办呢?
07:25
之前我们在这个批处理的work count里边用的方法是直接基于。直接基于execution environment这个类去调它的静态方法get execution environment,这样得到的就是一个批处理的,或者说啊,统一的底层的创创建出来的一个执行环境,然后他直接去读取数据源的时候,得到的其实是一个data source,那么我们知道data source它的底层其实是一个set啊,这样的话要用的其实后面调用的转换操作啊,就都是data set API了。
08:02
那我们说现在对于这个新版本的弗Li而言,它是流批一体的,不够那所谓的data set API就要被弃用,那这个时候我们怎么样能用这样的一个stream execution environment去处理批示数据呢?其实非常简单。它的底层我们都已经把。所有的这个批量数据可以当成有界流去处理了,但是如果说我们什么东西都不更改的话,它默认的处理方式还是来一个处理一个啊。之前我们可以看到,如果输入hello word hellolink的话,那么它每一次我们输入一条一行数据的时候,就会有对应的单词拆分出来,统计出结果。这样的一个输出显示,而真正的批处理是什么呢?它其实应该是所有的数据都输入完了之后,只要输出一个结果就可以了。那在这种情况下,我们如果不做任何的更改,显然就不能得到批处理的这样的结果,那怎么做呢?
09:08
这里我们引入另外一个概念,就是所谓的执行模式execution mode。Q mode的所谓的执行模式其实就是两种,批处理模式和流处理模式。默认情况下,如果说我们直接使用了stream execution environment去获取的话,获取到的流式执行环境,那么接下来代码它应用的模式就是流处理模式。那如果说我们直接创建这个批处理执行环境的话,当然得到的就是处理的模式,就是批处理模式了,现在如果我们不用这种方式。其实也很简单,只要我们在单独的配置一个execution mode这样一个执行模式就可以了。也就是说。从01:12点零版本开始。
10:01
Flink真正的实现了流批统一的API,那么接下来我们所有的代码都直接用data STEM API去处理。如果想要去处理批量数据的时候,想要使用批处理的时候,那么就直接去给他设置一个单独的批处理模式设,设置的方式呢也非常的简单,就是在。代码跟之前还是一样啊,我们直接这个代码就是完全可以不改RAM的这个处理的流程完全不改,只是把它打包之后再提交作业的时候。我们在命令行里边in wrong,然后后面再加上一个未知参数,这里边我们给了一个execution.wrong time mode等于batch啊,那当然了,除了这个batch之外,还有就是streaming streaming的话,这个不写,默认就是streaming模式。啊,所以这样的话,就跟我们之前的所有的这种提交方式都可以兼容,而且还可以直接用data stream的。
11:09
API写出来的程序,也可以很方便的把它转换成批处理的模式,好的,另外还有一个模式叫做automatic,就是根据我们的数据源是有界还是无界的,来判断到底采用批处理模式还是流处理模式啊,这种在有有的情况下也可以用这个自动模式去处理。一般我们更多的其实就是直接手动去指定batch就可以了。啊,那除了命令行配置之外,当然也可以在代码里边直接去做设置,设置的方法呢是EV后面调它的方法,调set runtime mode,这个方法里边给的参数就是runtime execution mode.batch。这种方法其实不是特别的推荐,因为我们可以想到啊,如果说在代码里边直接把它写死的话,那么我们在执行的过程当中,这个这段代码,它对应的这个作业就只能是以批处理模式执行了。
12:12
那这个其实在我们实际应用的时候,灵活度就变差了。在有些场景下,我们可能一份代码它的处理逻辑都是完全一样的,然后呢,诶,可能有时候我们要让它当成一个流处理去处理流失数据啊,那有时候我们可能就要把它在批处理模式下直接去运行,去处理一段批处理数据,那这种如果是这样的需求的话,我们根本就没有必要在代码里边把它单独的指定出来,而是统一的在代码里面根本不指定,就是一套data,最没PI。写好的这个操作流程,而在想要使用哪种方式哪种模式去执行的时候,在提交作业的时候,命令行加上参数就可以了。啊,所以这样的扩展性就会非常的非常的好,我们在使用的过程当中也会更加的灵活。
13:04
那这里边有一个问题,就是说到底什么时候才去采用这个batch模式呢?哎,这个其实整体来讲也非常的简单,之前我们其实说过,对于弗link而言,它的世界观就是流处理的世界观,也就是说你不管是批量的数据,还是一个一个来的连续不断的数据流,我都把它当成流,批量数据的话,那就是我们所谓的有界流数据。那。基于这样的想法,可能我们就想到了,那是不是对于这个Bach模式根本就没有必要呢?是不是所有的数据来了之后,你只要把它当成一个有界流去处理,那不就就是有界的数据当成有界流去处理,完全可以可以得到同样的结果吗?这个就没有任何区别了吗?还是有区别的,我们知道在batch模式下,它最终输出的其实就是一次性的输出,最终最后结果就可以了啊,在如果说我们真正要的就是批量数据的最终的一个结果的话,显然就没有必要使用流处理的模式,来一个数据输出一次,来一个数据输出一次啊。所以在这种场景下,我们使用。
14:18
流处理的模式,Streaming模式就不够高效了,所以这样的话就应该把它改成batch模式。这就是关于执行环境,它的执行模式的选择。那最后还有一步操作,就是所谓的触发程序执行了啊,这个在前面的代码里边我们都已经做过介绍,这里边所有代码我们写好之后,要注意的就是代码写好之后并不代表我们这里。所有的这个程序就都已经执行了,因为我们知道真正的流处理程序啊,特别是我们这里边读取了像ET文本流这样的数据源的话。代码写好执行起来之后,根本就不会执行后面的操作,因为还没有数据来,所以我们这里做的这个事情其实是启动了,一个是需要去启动一个监听的程序,或者说启动一个挂起的任务,然后等待事件到来,等待数据到来之后再去触发每一步任务的操作。
15:26
那所以接下来我们需要做的事情就是必须有一个显示的调用,告诉当前的这个作业,我要等待数据到来,等事件的驱动,我再去执行前面的这些操作,要不然的话,呃,那就我们就知道前面的这每一步操作执行完了之后,当前代码就执行结束,就退出了,那就不会再有后续的事情发生了。所以基于这样的想法,我们最后要有一步启动执行调用。
16:00
当前执行环境的的方法啊,这就是最后一步,有了上几步操作,整个的程序架构就搭建出来了。
我来说两句