00:00
我们当前的集群都已经提起来了,部署成功了,那下一步呢?哎,我们当然就是要提交job了,真正的去运行我们写好的代码,对吧,在生产环境里边运行,现在我们相当于起了一个这个生产环境的这个集群环境嘛,那接下来我们就得提交job了,那那job在哪里提提交呢?诶大家看这个可视化的这个页面啊,这边其实菜单非常的直白啊,就是第一个菜单,这里边就是overview。然后接下来呢,诶,你可以看到当前的这个running的jobs和completed完成的这些jobs我们当前都没有啊,都是空,所以不用去看了,然后下一个呢,你可以看到当前task manager的一个信息,哦,我们当前因为我只有一个task manager啊,所以这里边只有一个信息啊,然后你会看到它当前的这个slot有几个对吧?然后空闲的可用的slot有几个啊,然后CPU4核对吧?然后我们定义的大家注意啊,这是flink manager memory,这跟我们当时定义的那个不一样,对吧?因为我们当时定义的是。
01:04
给的是整个process啊,Memory啊,整个的那个process size,那那里边有一部分呢,是固定的,我们要划归到GVM的堆内存啊,然后还有一部分是本身JVJVM要使用的一些内存,那这里边呢,划归到flink,具体能够使用的就是我们做状态管理的内存,这里边默认给出来的,大家看到啊。就是这里边是512对吧,然后大家看这个堆应内存和这个状态弗link可用的这个内存,它俩是一样的,默认是都给了512啊,然后当时我们看那个配置文件里边,不是比这个还要大一点吗?大家还记得那个数吗?我们当时给了一个这个啊Li压文件这个common里边,大家还记得这里边给的是一个1728呀,对吧,比这个是还要大的,那多出来的那些是什么呢?
02:01
啊,那这个就是本身对外内存里边还有一些是所谓的raw memory啊,就是本身原始的这些内存,另外还有一些呢,是JVM本身需要它运行时候需要去占用的一些内存,这些都是包括在内的啊,所以说这里边其实可以去调啊,你可以去把这些东西就是在里边再做一个细化的一些定义,我们这里边默认的话,它俩就是平均分配啊,然后另外还有就是job manager draw manager,这里边大家看到也是job manager基本信息都在这了,对吧,而且呢,你看这里边是把这个task manager的这个process也已经列在这里了,照manager是知道他是manager的信息的。啊,所有配置项都可以在里边体现出来,那如果说我们要提交job的话,那就不要关心这些了,我们主要关心什么?这不是有一个submit new job吗?当然就是在这里去提交了。所以接下来我们的问题是在于我得先,诶把要提交的job先打包好啊,那我们现在这个代码已经放在这儿了,这是已经打包好的吗?诶,当然不是,我们现在应该在target下边生成我们对应的那个抓包,对吧?默认肯定是在target下边,现在大家看到我这里边呢,其实不用编译了,因为前面我们运行过,这里边就已经有编译好的这个class文件了。
03:24
所以如果你要是之前没运行过的话,大家应该先点一下comp对吧?啊,那这里边已经运行过了,已经comp完成了,那接下来是不是直接点package这个生命周期做一个打包操作就可以了啊,所以接下来我们就是耐心的等待一下啊,把这个打包完成,然后接下来我们在这里当然就是点击这个按钮,ADD new,直接把自己想要打包上传的啊这个文件啊,选中上传就可以了。呃,这里边我需要找到对应的那个目录啊。当前的project在big data下边,Fli tutor应该在这个target下边,对吧?哎,这里边现在我们还没有生成,所以需要去稍微的等一下,看一看这里边进度怎么样。
04:12
哦,大家看到现在我们已经打包完成啊,这个已经搞定了啊,然后我们现在如果。点开这个它target目录的话,诶,大家看到下面就有了,这个我们打好了抓包啊,当然有一个是就是没给我们默认打包的一个东西啊,另外一个我们是用插件定义了后缀专门打包的这个东西啊,那等下我们就提交这个好了,所以接下来我们可以在页面上直接选择当前打包好的这个东西,先上传到当前的link集群里边。哦,那那大家会想到,诶,这是已经在运行了吗。有同学可能想到,哎,那这里面running jobs没有东西啊,这为什么没东西呢?啊,大家注意啊,这只是把我们当前的抓包啊,就是本身打好的这个包上传上去了,你还并没有指定,大家想我这里边这个代码里边是不是有两个。
05:09
Object有两个这个,呃,没方法这个主入口啊,对吧,你要执行的话,你至少还得指定到底哪个是我的入口类啊,呃,所以说这里边其实你是需要去做很多配置的,那另外还有就是我这个stream work count里边我还需要有那个参数呢,对吧,还得传入那个当前的host和port,这些都是要去传的,所以显然不能这么简单上传就完事。那怎么办呢?点击一下,然后大家看看,这里边可以有很多的配置,对吧?那首先有一个entry class入口类,那我们知道这里边我们用这个流失处理吧,我copy一下它的reference啊,这个引用,哎,那大家知道,把这个入口类放在这儿就OK了,然后接下来呢,这里边有这个argument,又有这个参数,那还是local host和port是7777,对吧,先把它定义在这里,然后后面还有两个选项,一个叫做并行度parallelism啊,那这是什么呢?这是表示我们在提交这个job的时候,你还可以给他指定一个当前默认的并行度。
06:18
哎,所以大家看这现在我们就有好几个地方可以指定并行度,对吧?首先你是在代码里边可以指指定并行度,而在代码里边可以全局环境里边指定,也可以针对每一步操作每一个算子去指定,另外呢,我们可以在集群的配置环境里边啊,配置文件里边直接去指定,这里呢,又有一个指定的方式,就是在提交job的时候直接去指定。那大家可能会想到,那当前我这里边指定的这个并行度啊,假如说我给一个二,那所有的这里边做的这个操作,是不是就所有的并行度都是二了呢?是不是这样的呢?哎,这个大家要注意一下啊,这就涉及到指定并行度的优先级别了,哎,那到底是以谁为准呢?注意首先看代码。
07:08
如果代码里边有定义,或者说哎,对于这个代码里边就是有一些算子啊,比如说像我们这里边的这个读取数据源so的文本流啊,那大家知道骚的文本流,你这里起一个这个NC啊,就是我们这里退出来,我这里起一个NC提起来之后,当前的这个状态呢,哎,它就必须是一个串行读取的一个状态,它没有办法去并行。啊,那所以这里边它其实就是相当于这个算子啊,默认给我们限定了没有别的方法,它只能在这里边把这个并行度定义成一。那但是在在这里边,他的这个优先级也是最高的。这就相当于也是代码级别的优先级了啊,那首先是针对每一个算子它的这个设置,然后呢,是代码里边全局的设置,也就是说之前假如说我代码里边啊,同时这里边算子定义了一和这里边全局定义了八,那那这里边以谁为准呢?
08:13
哎,当然是每一个算子单独定义的,以它为准,然后是假如说没有定义的话,你再用全局的配置,我们之前说这里边,呃,如果默认的话,就是CPU核心数对吧,这是在开发环境里边,那如果到了运行环境里边,如果说我们里边没有。每个算子没有做定义,然后代码的全局也没有定义,那以谁为准呢?接下来就以我们当前job提交时候指定的这个。并行度为准。啊,那假如说这里边我们又不给,那以谁为准呢?那就以集群默认配置环境里的配置文件里边的那个,啊,Parallelism的default以那个为准。所以这里面大家看我给一个二的话,我可以收plan,我看一下他的这个plan就是执行计划啊,类似于一个执行计划这样的东西,到底会得到什么样的一个结果。
09:12
好,大家可以看到我当前得到的这个结果是什么呢?是划分了这么几步操作对吧?然后这里边的并行度大家看下面都列着呢,并行度是什么呢?前面这个socket啊,就是读取数据源,我们这个NC啊,文本流读取的时候并行度是一,为什么是一呢?我们哪都没配啊,这就是前面说的,它本身这个算子默认并行度就是一对吧,这相当于是代码级别的,这个没法改,然后另外呢,这个优先级最高,然后另外还有呢,就是最后的print,这里边并行度也是一,大家看print to这个标准输出对吧?那这里边也是一,这是因为我们在代码里边专门指定了它的并行度。然后别的操作都没有指定并行度,那以谁为准呢?诶现在不是开发环境,开发环境的话就以那个默认的CPU核心数了,对吧?现在不是开发环境,那要以当前提交job的时候指定的这个并行度为准。
10:12
啊,那看到这儿大家可能会有疑问,诶,那这里边为什么它是划分成了这样,看起来是四个任务去做执行呢?我们之前这个也也不止四步操作啊,啊,因为你大家细心会看到这里边这个第二个这个方框啊,这个任务里边它是把flat map filter和map全给全给放在一起了,合在一起了,这又是为什么呢?啊,大家不要着急,可以先把这个留一个疑问啊,后面我们讲运行式架构,给大家解答这个问题。啊,那那这里边我们会看到它的执行计划是这样,那我现在就可以直接提交了,对吧?点submit就可以提交了,然后这里边还有一个这个c point c point的话,这里面我们用不到啊,它主要是用来干什么呢?Cpoint跟checkpoint有点类似,但是又不一样啊,就checkpoint主要是做这个,就相当于是自动给我们隔一段时间就做一个保存存盘。
11:07
保证我们故障的时候能够自动恢复,而c point呢,C point就相当于是我们手动存盘啊,所以你也可以用它来做故障恢恢复,但是因为是手动存盘,我们一般不是用来做故障恢复啊,一般是用它来做其他的一些兼容性的操作,这个还呃比较有意思啊,就是后面我们讲到的时候可以再给大家展开讲,现在我们用不到就先不管了啊,那这里边我们直接submit提交一下,看看这个结果是什么样的啊。大家看这里边,诶,你提交之后,这里边显示的当前的这一个job啊,这个执行的图就跟我们看到的那个计划图是一样的,然后接下来我们看,诶下面它果然看起来好像是有四个任务对吧?啊那但是这只是四个任务的名称,然后后面我们看显示的还有什么呢?Tasks。
12:00
哎,就是这一个名称的任务呢,它有一个,后面这个名称的任务呢,它有两个,那这个又表示什么,这不就是并行度嘛,对吧,我一看上面这个并行度一对照就知道了啊,所以我们知道了,那当前的这个任务,其实它执行的时候应该有几个任务呢?应该是有这么六个任务,对吧?啊,所以最后我们执行的是这样的六个任务,然后我们看他在这里边已经创建出来,然后一直在转圈。诶,大家会想他为什么会一直转圈呢?这是为什么呢?大家可以猜测一下当前的这个状态啊。啊,大家可能会觉得,诶当前,那你如果要是一直这个转圈的话,那不是因为我们当前那个呃,数据还没发吗?对吧,你现在这个任务提交上来了,等数据呢呀,哎,那我们试试这个发一个数据试试啊hello word。试一下。还是转圈?大家看这里边没有任何区别对吧,我可以刷新一下啊,大家如果怀疑是这个页面没有刷新及时的话,我可以刷新一下,然后大家看这里边还是一直转圈。
13:09
如果说我到这里边去找这个,呃,Running drop的话,发现诶,它应该是一个running的状态啊,但是呢,我看好像看不到任何我我也不知道任何输出对吧,你看这里边它本来应该有BY次received,嘛,我接收到多少,然后再再发出去多少对吧?呃,这里边应该能够有显示呀,这里边它这个S看起来也不对呀,只是创建它没有真正接收数据去运行啊。这是为什么呢?呃,有些同学想到了啊,就是我们当前这个一直阻塞在这里,这个状态就看起来没有运行,主要核心问题其实是在于一开始我们给大家说的啊,是只有一个slot对不对。然后我们当时说这个slot是代表什么含义呢?它是每一个task manager上,诶把自己的资源划分出来的一个一组这个小单元,然后这个小单元用来干什么呢?一个slot就是执行一个任务,一个task的一个最小单元,所以它其实就是我们分配资源的一个最小单位。
14:17
那你这里面,诶,大家看现在有一个什么特点。Available task slots是不是已经没有了,对吧,之前我们是有一个的,有一个可用的,现在这一个已经没有了,已经被分配了,然后为什么我这里面一边一直转圈,一直这个这个转个没完呢?其实我们就想到了,那是因为资源不够用啊,对吧,你现在你看看你的task有多少个对吧,你这里边这个直接就有这个,呃,六个task,我们这这里面一数就有六个task了,你这里面一个lo怎么能够用呢。啊,所以说这里边大家如果耐心的话,可以一直等下去啊,等到最后他超时最最终会报错对吧,就是一个fail掉的一个一个状啊,所以这里边我我就不等了啊,我们就不要再去等下去了啊,那大家可能会想到,那那我现在不想等了,我想把这个drop取消,该怎么取消呢。
15:10
在这个页面上其实非常的简单,大家看有一个大大的右上角有一个大大的cancel draw,好,它就相当于是我们在这个开发环境里边,你直接去停掉一样,对吧,Run起来之后直接把它停掉一样啊,所以诶,大家看这里边刚好哈,不用我们去停了,它已经fill了对吧?啊,这里边已经fill掉了,然后我们看看这里feel是哪里fail了。你看上面这个是被cancel,所以上面这个task不是feel,别的也不是feel,是cancel了,哪里feel了呢?后边我们这里边第二步这个操作就是这一组,对吧?Fla map filter map,这里边它有两个并行的任务,他在给他分配资源的时候fail掉了。啊,那有同学说,那你后边这个aregation啊,这个这个做聚合难道不也是两个任务吗?那是因为你前面先fail了吗?对吧,前面先fail了,那后面当然都是cancel了,都是直接取消了啊,那所以这里显然我们就发现这是因为资源不够啊,那这里边就有一个问题,大家可能会想,那我这里面提交这个job啊,到底我得配多少资源才够呢?
16:18
啊,这里面我还是写出来host local host,然后放杠port 7777,我到底给多少资源才够呢?啊,有同学可能就说了,那前面我们是六个这个task嘛,那当然是要有六个slot,你整个集群启动的时候应该给六个。啊,当然你如果给六个的话,资源肯定够用,但其实呢,不需要那么多啊,那那在这里边我先给大家测一下,就是假如说我这里不给这个并行度的话,那家说我当前这个并行度应该是几呢。好,我们来呃看一下这个答案,其实你直接收plan就可以看到,对吧,因为这里边你不需要去提交嘛,大家一看收plan看出来了,它里边的并行度,所有的任务,所有的算子,并行度都是几呢?
17:11
都是一,哎,为什么这里面是一呢。这就是我们前面提到的,你如果说当前的这个优先级,大家还记得总结的那个优先级吗?我们现在的这个状态是什么?哎,是首先看代码对吧,代码里边啊,有个别的算子,它要求这个必须并行度是一啊,这个是没法没法变的啊,这个肯定是一,然后有一些呢,中间这些都没定,那没定的这些看谁呢?啊,看我们代码里边全局的这个环境配了这个并行度是几,那这里也没配,接下来又看谁呢?接下来看的是提交job的时候的这个并行度,对吧?哎,这里边我们还没配,还没配,那再看谁呢?最后要看的是。
18:00
注意不是有些同学呃认为可能是四,那可能是理解成哎默认的CPU核心数量吧,不是,当时我们那个是默认核心数量指的是开发环境,现在是生产集群环境,那它主要看什么呢?看集群的配置文件啊,大家还记得我当时的那个集群配置文件里边啊,这里边我可以先把这个停掉啊,集群配置文件里边默认的这个并行度配的是几吗?是不是一啊啊,所以最后我会追踪到这里边来。啊,所以这里边如果说我们什么都不给的话,他的这个提交啊,其实是并行度是一啊,那大家可能会看到你这里边收plan的话,诶,这看起来又又不一样了,对吧,跟刚才又不一样了,不是六个任务现在呃,首先你这个病度小了嘛,那肯定任务就少了,然后他好像压缩的更厉害了,对吧,中间又做了很多压缩,又都合在一起了,然后看起来这个方框这么看的话,好像是有两个任务。
19:00
那现在有两个任务,我应该有几个slot来执行呢?我直接submit一下。大家看一下现在的这个情景啊。大家看现在的情景,我现在是不是有两个task啊,对吧,然后这里边每一个这个名字对应一个,一共有两个task,但是大家看现在不会转圈了,对吧,直接运行之后啊,直接变成running绿色的状态,直接运行起来了。然后接下来我们就看,诶,那既然是运行起来了,那我就在这个NC里边输入一点这个hello word,看看这个效果到底能不能得到结果吧,对吧,然后我输入一个呃,然后接下来大家可以看一下我们这里边的这个。啊,首先我这里可以刷新一下,对吧,我把这个刷新一下啊。诶,大家看这里边是怎么样呢?我前面的这个操作,哎,它这个没有写这个收到多少数据,这个输出cent多少数据,但是后边这个操作啊,聚合这里面这个操作它是收到了两个数据的,为什么呢?呃,我们这个hard word是两个嘛,两个后面要去做sum,做做计算,所以这里边他收到了30个BAT的这个字节。
20:14
啊,那这里边为什么前面我们这里边没有receive呢?呃,因为这里边我们只能监控就是自己内部本身,就是我们这里边算子本身的这个数据,它并不能监控到外部,给给从从外部这个端口传进来的内容,对吧?啊,所以你你是看到这个在内部的数据流向,所以现在哎,我真的接收进来了,那看看有没有输出吧。输出大家想一下是在job manager里边还是在task manager里面。又是一个问题,好,呃,那大家可能会认为是在draw manager上对吧?那我们来看一下draw manager里边有东西没有啊,我们首先看,那到哪哪去看呢?啊,大家看上面,你这不是有log吗?首先这是有启动的时候的日志,就是如果说这里边我们出错的话啊,出现异常你可以来排查,可以来看一看,然后呢,还有一个std out标准控制台输出,哎,那大家看到这里边,前面只有这个cello j啊,就是日志打印的一些信息出来,这里边没有任何的我们word count的数据出来啊。
21:16
你刷新也没用对吧。那大家就想到了,其实应该在哪里呢?在task manager对吧?大家要注意啊,尽管现在我的task manager和job manager都是同一台机器,都是local host,但是呢,在flink这里边管理的时候,它是要划分开的,你要找到哪里去找呢?到task manager这里边,你就看到有一个HELLO1WORD1啊,你如果后边如果继续去输入hello flink的话啊,那当然这里边它不会实时刷新对吧,你刷新一下哈2FLINK2,这就可以看到结果。啊,因为我们之前说真正执行这个代码的时候,这是在哪里执行呢?大家注意啊,就是我们本身前面你去定义这些啊环境对吧?啊,然后去定义这里边的每一步操作,去分发这些任务,这是在job manager上去做的。
22:11
而真正执行数据来了之后,执行这个操作在哪里去做呢?Manager,他是真正干活的人。所以我们最后看结果也一定要到task manager上面去看,你如果要是文件输出的话,也要到task manager上去找啊,这就是关于我们这个提交job啊,运行的这样的一个过程。
我来说两句