00:00
那接下来我们集群已经提起来了,那我们是不是想要去提交一个一个作业了,提交一个状,怎么去提交呢?他在这里已经看到了最后一个我还没点的这个,这不就是要提交一个drop吗?Submit,一个new drop,那这里当然我就可以去,在这里是不是可以ADD new啊?那我们要提交的drop应该是个什么东西呢?我是不是应该是打包好的,一个一个抓包啊,对,所以那这个东西我们还没有,我怎么去打包呢。大家记得我们当时引入了打包工具对吧?哎,在这个life cycle里边有一个package,我们直接点它是不是就可以调用我们的那个打包的插件去执行我们这个打包的过程啊,就可以得到一个炸包,我们先让它运行一下啊。然后大家就会想到我接下来是不是在这里直接把那一个抓包选中,然后把它提交就可以了。
01:01
哎,这是我们这个,呃,控制台里边能够做的一些操作啊好,我们看一下这里边。大家还记得我们想要的是对后面是不是给了一个后缀啊,对吧啊,就是什么draw with dependency,我们想要的是这个后面新新打好的这个包啊。接下来大家知道这个打好了之后,它会放在哪里吗?对,它是不是肯定会放在这个ta的下边啊,诶大家看是不是已经有了这样的对吧,所以我们想要的是这个JA with dependency,所以接下来在这个web UI上边。去艾特一下啊。我们找到对应的位置,我这里边不是在在这里啊,Link target在下边把这个上传。Upload对吧?啊,先把它上传上去,上传上去之后,那大家说这个上传完了就完事了吗?这已经在运行了吗?哎,我们看一眼这个running drop没东西啊,哎,大家会想到你还没指定那个,我们不是还得还得指定那个运行参数的吗?对吧,什么还都都没指定的,这个只是把这个jar包上传了,并没有真正的提交,那提交怎么提呢?对,选中它,诶大家看这里边是不是就可以配置很多东西啊,这里边还有一个submit啊,那这个submit就相当于真正的提交了,对吧?好,这里边大家看这里边有一个entry class。
02:33
这又是个什么东西呢?啊,这个东西大家其实想到了我们这一个整个这个项目真相当于是一个项目了,对吧?项目里边是不是有两个object呀,他们是不是都有一个main方法,那你到底哪一个类里面的这个main方法是我们的入口类呢?这是不是我们必须得定义好啊,啊,要不然他不知道,所以我们用这个流处理的啊,Copy一下reference啊,Pass的话就会连上它的那个全路径,对不对,我们这里只要reference就够了,把这个copy过来,然后后边这里是不是指定这个提交时候的并行度啊,好,那大家会想到了,现在我们已经讲过哪几个地方可以设冰情度,大家记得吗?
03:18
文件配置文件里面可以代码,代码里面也可以对吧?啊,另外是不是这里在提交的时候也可以啊,那他们的优先级又是什么样的呢?对这里大家注意啊,优先级最高的是是代码里边的对吧?如果代码里边指定了它的那个并行度的话,就以代码里边的并行度为准。那如果代码里边没有指定的话,那会怎么样呢?对,就用我们提交时候,这里边提交的这个参数为准,那如果我这里边也也不给呢,这这个必并不是必须的一个参数啊,如果这里不给的话,他就会用对我们配置文件里边默认的那个并行度啊,那这里边比方说我可以给一个二对吧,跟那个改的不一样大大家会想我这里边的并行度最后其实应该会变成几啊啊运行的时候大家会想到只有一个slot对吧?对诶这里就有可能有问题了,好,我们先把这个设成二,大家看看等一下提交有什么状况出现啊,然后继续下面是不是还有这个argument参数要传入啊呃,这里边我们,诶不是啊,这个叫host对吧,Local host,然后POST7777对不对。
04:37
啊,这里面还有一个c point pass,这个我们先不给大家讲啊,这个是假如我们想去指定那个c point,从point启动这个,呃,应用的话,提交应用的话就去指定这个,这里边我们不涉及,有了这些参数,我是不是就直接可以submit了,直接点击submit把它提交。诶,大家看。大家看啊。
05:01
哎,他给我们生成了这样一张图,这张图是什么意思呢?这张图的意思其实就代表了我们整个任务的一个执行计划,对吧?啊,大家看到这其实就代表了我们这个任务的一个执行计划,它执行的过程当中是一个什么样子呢。大家会看到我们给它配置的并行度是是几啊,是二对,所以大家看这里边,它中间我们做这些操作的并行度就都是。二诶,那大家说为什么前面的这个大家看这里面source,这是从我们那个socket去读取数据源,对吧?最后还有一个think,这是不是相当于我们print打印到标准控制台这样的一个输出啊,这两个为什么都是一呢?诶,这里边think这里边其实比较好理解,大家还记得我们在这个代码里边做过什么事情吗。
06:05
代码里边是不是直接把最后这一步print直接设了一个set并行度一呀,哎,所以它是不是这里的并行度就一定是一。诶,那大家这里其实又发现一个很好玩的事情啊,他这个设的是一了,前边这些算子是不是没设啊,所以大家看这里边flink里边给我们提供了什么样的一个灵活度呢?你这里的并行度是一前边。大家看我每个算子都可以去设并行度,大家看是不是这样,我可以map这一步直接去set paraism对吧?诶我设设哪个并行都可以,它是每一个算子都可以去单独设置并行度的。啊,所以大家会看到在这里,假如一开始我们提交的这个代码里边,这里如果没有设的话,它会怎么样呢?对,要用默认并行度,而这里设了的话就用这个一,那他这里边没设的话,默认并行度是几对,是我们提交时候的那个二对吧?诶这里就又有一个问题,那为什么前面这个socket source它是一呢?
07:17
啊,这是这个,这是本身这个socket stream决定的,因为大家知道我们的这个文本流,它可以有有这种并发并行的这种状态吗。其实是不可以的啊啊,所以这里边就是它永远就只是一个顺序线性的一个流,单一进程的一个流啊,这是本身这个算子决定的,那如果说我们是那个从文件里边read from text的话,那其实这里边它大家会看到它的并行度也是二啊,这个大家可以下去之后自己去测啊。哎,所以这这就是比较有意思的一点,大家在这里可以看到它这里边不同的并行度,在我们最后执行的过程当当中这样的一个表达,而且在这里面大家还可以看到一点有意思的东西啊,大家看他做了一个什么事情呢。
08:11
他把这个前面我们的那个算子其实有很多个,对不对,他把map filter map是不是相当于合在一起了呀。他为什么要合在一起呢?来这里边先给大家留下这样一个问题,这其实是flink在处理这个执行计划的时候做的一个优化,那那大家自然就会想到,那么后面这个aggregation为什么就没有合在一起呢?哎,这个先给大家留下这样一个问题啊,我们后面再给大家解答,大家先看一下这个执行的结果啊,诶,大家看这个执行执行的结果是什么呀口。大家看一直都是created的一个状态,然后一直在启动,一直没启动完,大家能想到这是为什么吗?啊啊,端口没开是吗?大家看一眼我们这个端口开了吗?大家看我们这儿一直开着呢呀,这开着呢,而且大家记得我们之前如果这个端口没开,它会怎么样啊。
09:14
它对它应该是直接fail啊,我们当时运行的时候,它是直接了,它不会一直在那里等,那这是为什么呢?对大家会想到,这就是我们当时给大家说的,我只有一个可用的插槽slot,结果我要的并行度是几啊。对,这里边是不是最大,所有的这些算子里面最大的并行度是二,那是不是我就相当于至少得有两个slot才能并行执行这个东西啊。呃,那他他就会发现我一直发现我想要申请两个,一直申请不到,那他是不是只好一直在这儿站转圈,一直在这等待资源分配啊,啊所以这就是啊现在的这个状态啊,那大家会看到这个就比较悲催了,那我只能把它取消掉了,怎么取消这样一个状态呢。
10:02
哎,这里边我们可以啊。诶,大家看它已经其实已经fail了啊,已经fail了,那就不用,那就不用取消了,对不对,因为这里边相当于已经过了一段时间。过了一段时间之后,他自己就已经feel了,对吧?啊,所以这里边就就不存在取消的问题了,好,那大家会想到,如果真正我们想让他跑起来的话,应该怎么样呢。我是不是应该还是这个当前我们这个代码,我把这个列类放进去,我这里面的并行度是不是就不能给二啊,我是不是就应该给一,哎,或者说我不给可不可以不给的话,这个默认命行度从哪里去取?诶我们知道在我这个idea里面运行的时候,开发环境它默认的病毒是四。那这里边默认密度应该是几?对,如果这里不给的话,它是不是就又会追溯到我们的默认配置文件里面去啊,那里我们配置的是一,大家还记得吧,parallelism.default,是E,所以这里边就还是一,那我这里边给一个host local host杠杠,PORT7777。
11:15
好诶,大家看这里,其实我是直接可以收plan的啊,直接看它的这个执行计划的,先给大家看一眼,大家看看吧,大家看现在就变得不一样了,变成什么样了。嗯,是不是并行路都是一,他把更多的东西合在一起了呀,甚至连那个S都合在一起了,对不对,然后后边的这个think也都合在一起了,但不同的是在aggregation这个地方还是给它分开了,对吧?啊,还是给他划分开了,好那大家看一眼,我们真正提交这个drop,那大家看这里边的这个图是不是跟刚才那个一样啊啊,所以大家会知道这个图不是我们运行的时候才生成的,对吧?啊,其实是我们提交这个状的时候,他这个东西就已经是生成了的,计划是已经生成了的。
12:05
然后大家看这个是不是就没有一直处在created的状态,大概转了一会儿这个就正常了,对吧,它就真正变成了一个running状态,这里边我们想去测试的话,我们是不是这里要输入一些数据啊,Hello word对吧。大家看一下这里会不会有一些变化啊,当然这里边它的刷新速度可能会有点慢啊。诶,大家看大家是不是能看到这里边,对,它这里边相当于就是我们这里的两个任务对不对,大家看它是分成了两个任务来做的,一个任务就是从S到Fla map到filter的到map,就是这第一个,然后它这里边就发出了两条记录,对不对,为什么他发出了两条记录呢,大家想。我这里面明明只输了一行啊对,这里边是不是你做完这个flat map之后,就相当于是一个哈一个word了,所以它最后发出的是两条,那同样这里发出两条,后边这一个任务是不是就会接到两条,收到两条啊啊,这是这个任务的这个数据情况,那大家可能会想,那这个我们的数出到底在哪看呢?
13:17
我们的输出是要输出到哪里啊,打印到控制台,哎,那这里面的这个就有点儿不好了啊,就是早知道我们当时就把它直接保存到一个文件里面,大家如果想改这个,改到这个保存到文件的话,也可以怎么做呢?可以有另外一个方式啊,大家看到可以什么。大家看可以write as txt,也可以write as csv,对吧?然后你定义一个路径,是不是就可以把我们这个数据直接写入到对应的那个目录里面去了啊,这个大家如果想保存这文件的话,可以用这个方法啊啊这里边我们就不详细来写了,那大家会想到这里如果是控制台输出的话,怎么输出呢?怎么看呢?哎,那我们是不是得去找对应的job manager或者task manager,看它对应的那个控制台输出啊,那我们是看job manager还是task manager。
14:10
应该在哪里有,有些同学说job manager,有些同学说task manager。这里大家要注意这个print这一步,输出这个信息,他应该是干活的人去输出呢,去执行呢,还是做任务调度的那个人去输出呢,大家想一想。大家注意啊,这里其实这个print是不是相当于就是我们任务的一部分啊。大家看这个print作为think的一个一个方法,包装到了我们这一步任务里边,对不对?哎,那么在这一步任务里边有一个print的操作,这个任务应该谁去执行。是不是应该是task manager去执行啊,哎,所以大家就会想到这个print的控制台输出,其实应该找task manager,大家如果不信的话,我们看一眼吧,这manager这不是log吗?那大家看下一个是不是就有一个标准输出啊,这是不是就是控制台,我们看一眼,果然什么东东西都没有,那如果我们找到task manager的话,这里我们可以点进去。
15:18
啊,这里还是log,这里标准控制台是不是就有一个HELLO1WORD1啊啊这这里当然就实现了我们这样的一个测试啊,当然这里我们还可以多去给几个测试数据啊啊hello flink那大家看这里面没有实时的刷新,但看到一个刷新符号对吧,它这不是实时刷新的,我们已经手动点这是不是哈,变成了两次啊啊所以大家看这个过程是完全没有问题的,这就是我们想要的最后的这样的一个效果。
我来说两句