00:00
我们想要在flink当中调用data three API,那首先第一步就是先要去创建一个执行环境啊,这个其实很好理解,因为我们说FNK是一个分布式的处理引擎,所以一般情况下呢,我们当前的代码应该是要打包之后提交到远程的分布式集群上去执行的啊,就像我们之前做的部署提交的那些步骤一样,那那另外呢,我们当然也可以直接在本地当前的一个集成开发环境里边,在IDE里边直接运行模拟一个当前的Li集群,那这种方式跟提交到远程集群上去运行当然是有所不同的,所以我们会发现,对于一个link程序而言,它应该先要知道当前到底处在什么样的一个上下文当中,首先就是要获取当前的执行环境。那这一步其实我们发现在代码当中非常简单,上来之后就是直接调用STEM execution environment的get execution environment方法啊,这个我们可以理解成这是一个静态方法,当然呃,对于代码而言啊,没有静态这样一个说法,那我们点进去的话就会发现这调用的其实就是。
01:14
当前stream environment这个类的伴生对象里边的方法,哎,那其实本质上来讲就可以认为这是一个静态方法了,但是我们就会发现这样一个调用啊,得到了一个当前的流市执行环境。哎,那我们说流式执行环境可以是本地,也可以是远程的一个分布式集群,那当前1GET到底得到的是什么呢?啊,其实使用这种方法调用getecu environment这个方法对于我们来讲其实是非常友好的,它比较智能,它会自动的帮我们判断当前的运行上下文到底是什么样子啊,就是如果当前程序是独立运行的,那么就直接返回一个本地执行环境。如果是创建了抓包,然后从命令行调用啊,提交到集群里面执行的,那么就返回集群的执行环境,所以我们可以认为它就是一个智能化的判断方式啊,直接用统一的接口调这个方法,直接就把这一个最终真正的运行环境获取到了,所以一般情况下我们都是直接这样调用获取到就可以了啊,那当然了,通过这样一个介绍我们就会发现啊,那getcu environment它其实本质上应该是link框架帮我们做了一些事情。
02:31
他帮我们做了一个自动判断,那真正意义上底层应该调用什么样的方法呢?诶,那其实那就是判断出来,如果当前是本地执行环境的话,它应该执行的就是。Create local environment,创建一个本地环境这样的一个方法。我们可以看到啊,第二种我们创建获取当前执行环境的方法,就是调用create local environment,好,那直接得到的就是当前的本地执行环境,这里边可以传入一个整数的参数。
03:04
指定默认的并行度,如果不传的话,诶,那我们知道啊,本地执行环境啊,就像我们在这个IDE里边直接模拟一个定的集群一样,默认的并行度就是当前CPU的核心数。那与之对应的有本地就有远程,所以另外一个方法就叫做create remote environment,同样它也是直接调用这样一个方法啊,基于stream execution environment,调用create remote environment就可以了。那里边呢,我们就可以指定当前集群的join manager的主机名、端口号,以及我们要提交给manager当前打包好的那个抓包。这样的话,我们得到的就是一个集群的执行环境啊,当然了,在真正的代码应用的过程当中,我们不需要调这两个方式啊,啊,因为我们知道,如果说在代表当中我们要调这个方法的话,那就得自己先明确我当前到底是一个本地还是一个远程,那假如说我是在本地开发,然后做完了测试,最终我要提交到远程上去执行,那这个时候难道我还要再去改更改代码吗?这显然是不可能的,哎,所以一般我们就直接调这个统一的啊,自动判断的这个方法就可以了。
04:17
这是关于在代码当中创建一个执行环境,然后另外我们需要说的,那就是执行模式。其实前面在第二章我们也已经介绍过啊,就所谓的执行模式execution mode指的是什么呢?其实指的就是批处理和流处理两种方式,我们之前说过啊,对于新版本的弗林而言。01:12之后,哎,那当前已经做到了流批起data set API已经被弃用了,那这个时候我们不管是做批处理还是流处理都是使用data API,所以我们这里啊,直接开发了这个程序之后,它是既可以做批处理也可以做流处理,诶那我们知道这个默认情况下,直接一执行,它显然是来一个处理一个,这是流式处理程序嘛,那假如说我们要想直接把它变成一个批处理程序怎么办呢?那就是在提交当前作业的时候指定执行模式啊。
05:14
就是之前我所说的加上一个参数指定当前的runtime mode啊,那对于早些时候的弗link版本啊,它的批处理的执行环境的获取跟流处理是非常类似的,那只不过呢,调用的是另外一个方法,直接调用的就是execution environment的get execution environment方法啊,所以整体来讲就是流失执行环境多加了一个stream。哎,但是这种情况我们就会发现啊,容易混淆,而且批处理和流处理获取的方式是完全不一样的,这就会比较麻烦,哎,所以之后呢,这种方式就将被弃用了,那如果我们看到一些早期的代码,也应该知道它到底是怎么回事,那现在的做法是什么呢?诶,那就是直接使用统一的stream API,前面那就都是streamq,里面获取到的都是一个流失执行环境,那对于批处理而言,我们只要提交的时候flink wrong指定一个杠d execution.run开mode等于batch就可以了。
06:16
所以这里的batch就是一个批处理的执行模式。如果不做配置的话,默认这个模式其实就是streaming,也就是流处理的模式啊,所以一般情况如果我们是流处理程序的话,就不用指定任何参数就可以了。这是比较常用的一种方式,就是直接通过命令行提交的时候去做一个配置,那另外还有一种方式呢,我们可以直接在代码里边进行配置,诶这个方式可以简单了解一下,就是env调用一个方法叫做set runtime mode,直接对当前的运行模式做一个设置。里边要配置的呢,那当然就是runtime execution mode这样一个枚举类型里边的某一个值啊,比如说点batch指定当前是批处理模式。
07:04
那这里需要强调一点的是这种方式代码里边配置这种方式并不推荐,为什么呢?啊,因为我们知道这相当于就是hard code嘛,直接在代码里边写死了,那当前这段代码我们打包之后的这个应用就只能作为一个批处理程序去做提交了啊,那假如说我们想要把它当成一个流处理程序去做运行的话,那显然我们就得重新写代码,重新打包,重新提交。其实我们知道没有必要啊,同样一份代码,如果我们统一使用了data stream API,只是在外面提交的时候单独去指定。当前的执行模式的话,那显然我们就是同一份代码,可以重复利用,你让他做批处理就是批处理,让他做流处理就是流处理啊,这个扩展性就会更好。所以一般情况。在代码里边利用当前的EV,当前的执行环境去做设置,这种方式是不常见的,不推荐的。那另外最后我们还要说一下,就是到底什么时候去选择当前一个批处理模式呢?简单来讲那就是。
08:08
当我们的数据是一批一批的,哎,都收集齐了,然后统一要做一次处理的时候,那当然就应该是批处理模式啊,啊,所以我们的标准就是用batch模式去处理批数据,用streaming模式去处理流数据,那对于一般情况下,我们知道这个数据都是正常真实的。环境里边,生产环境里边数据都应该是一个一个不停到来的,应该是一个流式的数据,所以一般情况我们还是使用流式STEM模式会更多,也会更加的高效,实时性会更强啊,但是有些时候呢,我们会发现啊,所收集的数据你来一个就收集一个,处理一个,可能代价会比较高,我们可能也是一批一批到来的,那这个时候我们就可以考虑使用Bach模式,而且有很多情况下,我们可能不需要每来一个数据就得到一个输出结果,我们有可能就是来了一批之后啊,最终统计一段时间得到一个结果就可以诶,那这种模式显然使用BI就会更加的简单,对于资源的占用也会更少。
09:13
这就是关于当前执行环境的一些内容啊,那当然了,最后我们不要忘记,就是在当前代码定义好,所有操作完成之后,最后要加上一个Env.ecute把当前的流处理程序要执行起来。这就是关于执行环境这一部分,内容比较简单。
我来说两句