00:00
我们现在已经知道怎么样去启动一个flink集群了,那接下来要做的呢?当然就是把我们已经写好的link代码进行打包,然后向集群去提交一个作业,真正去运行我们当前的link程序。那前面我们已经提到过,在当前的web UI页面上呢,诶,我们可以直接点最后一个菜单submit new job,在这里就可以直接去提交作业了,那在这个页面里边,显然我们是要点右上角的ADD new啊,然后接下来呢,弹出的这个窗口里边,我们就应该要选择打包好的一个抓包,然后把它上传上去,接下来就可以提交到集群上去运行,那这里边我们关键就是需要先把对应的抓包打好。那我们知道其实在idea里边本身对于微而言啊,我们在这里啊,点开之后life cycle里边本来就有这个package,但是呢,诶默认情况下,我们这里边做这个打包的插件,对于GALA代码打包效果并不是非常的理想,所以呢,诶我们一般情况可以自己去引入一些额外的插件,比如说。
01:13
我们这里文档里边给大家提供的就是maven assembly plug in这样一个ma文打包插件,有了这个插件呢,我们还可以非常方便的去自定义结构和定制依赖关系啊,那所以我们可以直接把这一部分copy到po文件里边啊,那这一部分呢,这里还单独的把所有的当前的依赖啊打在一起,给了一个专门的后缀名叫做JA with dependenceies啊,那这个当然了,我们可以用也可以不用,直接把这一部分copy到po文件里。下边增加一个build标签,然后下面引入对应的plug,引入之后呢,诶,我们可以在这里来刷新一下。当前的plugs这里我们就看到多了一个assembly,这就是我们当前打包插件,然后接下来呢,我们就可以直接点击life cle里边的package去做一个打包。
02:07
啊,那当然了,如果说我们是第一次之前没有安装过这个插件的话,可能需要去下载很多对应的包和依赖啊,那呃,这个过程可能会比较漫长,如果下载过的话,这个打包就会非常快了,打包完成之后,我们就会看到他给目录下边多了两个炸包,诶一个是本身我们这里默认啊会生成的一个f real1.0 step short点这另外一个呢,就是加上我们对应的这个文件里边定义了后缀的Java dependencecies这样一个抓包啊,那现在的话随便使用哪个都可以啊,但因为我们知道啊,当前我们在po文件里边引入的依赖主要就是flink相关的嘛,在flink集群上去执行,当然这些依赖是不必要的啊,那所以说我们直接使用上面这个就可以了啊啊,那接下来我们就可以到web UI这里来直接ADD new,哎,然后找到对应的target目录,我们直接把对应的JA包上传。
03:06
那这样上传之后,当前就已经运行了一个作业吗?哎,显然没有,因为我们当前这个上传只是相当于把当前的抓包提交到了我们集群上面,集群还没有真正开始运行它,因为我们想到啊,之前我们这个程序里边,本身当前这个项目啊,里边是包含了三个不同的object,那我们当然至少得指定一个入口类啊,要不然我们根本不知道从哪里去执行嘛。另外如果说我们直接指定当前的stream work count作为当前的入口类的话,这里还涉及到我们还要给对应的参数,哎,那所以这些步骤都少不了,很显然我们在这里还需要进一步指定。那怎么去指定呢?很简单,在这里直接点开,我们在这里就可以做各种各样的参数填写,比如第一个就是entry class。
04:01
就是当前的入口类,我们当前呢,就以stream作为入口类就可以,所以我们可以右键一下啊,可以copy当前的reference啊,直接把string word count作为入口类传入,然后后边这里有一个parallellyism,就是我们所说的运行的并行度,以什么样的并行程度来执行当前的这个作业啊,那我们当前如果不给的话,不给任何参数的话,那默认用什么呢?当然就是我们集群的配置文件里边的默认并行度是一。那另外下面还有一个当前程序的参数,这个当然是是必须要写的,我们当前有post哈杜102以及PORTT7777,当然了不要忘记在这里配好了之后呢,还应该到哈杜102那边去,真正的把这个NC要提起来啊,首先我们当前的这个集群是提起来的啊,那对应的这个NC。
05:02
服务也应该要提起来,要不然的话我们这边执行肯定会报错,那这里边我们看到还有一个选项是c point pass,这是什么呢?这是一个叫做保存点的选项,在flink里边我们可以去创建一个保存点,把当前flink的状态相当于做一个存盘保存,对应的在启动的时候呢,就可以以之前保存的某一个保存点作为一个快照进行恢复,那这个后边我们会讲到啊,啊,那这里面我们看到还有一个选项是可以收plan,就可以快速的看到我们当前。这个作业的执行计划是什么?也就是说我们那个流水作业线,每一个工位啊,它的这个流水作业的流程到底是怎么样,我们看到看起来是两个框框,诶,那也就是说我们当前应该是有两步主要的操作了,诶其实我们会想到为什么它这里是两步呢?有点奇怪啊,因为之前我们在代码里边其实能够看到我们这里边至少诶前面Fla map这是一步,然后map呢,又是一步,另外我们想到前面这个读取数据源,这不也应该是一步吗?啊,那后面还有这个K分组,还有后边做统计,这应该是有好多步骤啊,最后还有一个打印输出很多步骤,为什么最后这里只有两个框框分成了两步呢?
06:23
具体来看的话,其实我们就会发现这里具体里边写的这个步骤,哎,那看起来还是什么都有的,比如一开始就是一个source socket stream,很显然这就是在读取数据源,然后后边一个箭头过来,Flat map,然后map,看起来它是把我们的前三步合并成了一个步骤。然后之后呢,有一个K的aggregation,哎,也就是说指定了键之后,做分组之后的一个聚合,那看起来是把我们的key by和sum又合成了一步操作,然后后边还有一个think think是什么呢?打印到标准输出,哎,也就是说我们最后的print和分组聚合是又合并成了一个任务,诶,那所以看起来的话,是不是有些任务可以合并在一起呢?哎,我们可以带着一个问题。
07:16
继续到后边去进行探讨,那所以接下来呢,我们就可以直接去做一个提交了,我们看一下当前这个提交能不能成功吧,直接点后边的submit。诶,我们看到当前提交之后转圈圈,然后接下来变成了一个running状态,很显然就已经提交成功了,所以我们看到当前正在running drop就有一个,那么它对应的这个任务有几个呢?Total是有两个正在运行的任务task,诶,那对应它占用了几个我们说的那个任务槽,那个slot呢,到overview那边去看一下。诶,我们看到当前原先我们本身总共可用的slots是两个,现在我们跑了一个任务之后,跑了一个作业之后呢,还剩下一个可用的。
08:09
也就是说我们之前跑的这一个作业,它有两个任务,但是事实上只占据了一个任务槽,诶,这个看起来行为有点奇怪,为什么是这个样子呢?啊,我们可以带着这个问题后边逐步展开去做详细的讲解。当然了,因为我们现在还没有输入对应的数据嘛,所以我们会看到啊,当前它的这个运行的状态,这里并没有收到任何的数据,也没有发出任何的数据,所以我们这里边得给他传递一些数据看看效果了。我们就到哈杜甫102这里来,然后输入一个hello word。然后接下来我们会看到当前的数据,诶,这里边果然就发生了变化,我们看到下边的这个任务啊,就是后面打印输出的这一部分,它就接收到了两条数据,然后进行了处理,很显然这个数据就发生变化,但是我们对应的控制台的那个输出结果到底在哪里呢?诶,这个我们可以找一下,我们自然想到了,那对应的就应该是要不是task manager,要不是job manager,会有一个控制台的输出。
09:21
那到底是谁呢?很显然我们当前执行任务的人是task manager,所以对应的这个打印到控制台这一步操作很明显是打印到了task manager的控制台,所以这里边我们可以点开,现在有两个task manager,我们点开其中的一个,点开104,当前这个104啊,点它的std out标准控制输出,这里好像没有东西啊,刷新没有东西,哎,那没关系,我们再去看一下另外一个。103ST out,诶,我们看到这里面就输出了HELLO1WORD1。啊,当然了,我们进一步还可以再去输入一个hello flink。
10:03
然后接下来我们再来看一眼,刷新一下。我们看到这里边就输出了HELLO2FLINK1,那如果说我们接下来呢,再去输入一个hello。我们看到这里输出的内容好像全部都是在103上啊,那我们自然会想到,那呃,104难道就是没有用的吗?为什么当前我们没有并行执行呢?明明有两个task manager吗?诶,其实我们仔细一想的话就会发现,当时提交这个作业的时候,我们其实默认给的并行度,我们还记得吗?当时提交的时候这里会有一个parallelly,我们没有配置,那么它使用的默认并行度就是集群配置文件里面的一。那并行度是一,那当然它就会只在一个task manager上去执行,事实上它就只会占用我们这里的一个task slots。
11:00
这就是我们所说的啊,并行执行任务的那个资源,诶,所以当前我们所有的输出就全集中到了一个task manager上。那现在我们已经提交了作业,而且已经运行了测试了它的结果,那假如说现在我想要把这个作业停掉怎么办呢?因为我们知道这个无限流处理程序啊,它相当于是一直在运行的无休无止,诶,那这个怎么办呢?那当然了,我们可以点这里,点到当前正在运行的这个作业里边,来的时候右上角有一个大大的cancel draw,哎,直接点这个。然后我们OK,接下来当前的这个作业就不再是一个运行的状态了,它就变成了已经被取消掉的一个作业,然后我们再回来看的话,就会发现,诶,资源又释放掉了,现在可用的tasklos又变成了两个。
我来说两句