00:00
啊,我们已经把这个流处理word count做了一个完整的实现,然后在这个代码里边,其实大家会发现,当前我们这里边默认我这里边执行的时候啊,给大家解释了,默认的并行度是四,按照当前的CPU核心数量指定的,那大家就会想到,诶,那你你这个并行处理它能不能做一些调整呢?对吧?诶你这里边直接就是按照这个核CPU的核心数量,呃,那假如说我不想要呢,假如说我就想要,比方说我当前有四核,但是我就想并行度是二,对吧,我就只要两个线绳去去这个并行处理就可以了,或者有同学想,哎,我尽管是这个核心数量是四,但是我就想要有有八个线绳同时去处理行不行呢?其实是可以的啊,因为大家知道就是关于这个CPU去做多线程处理的时候,你即使是单核心也可以多线程嘛,就是相当于是时间片轮转嘛,对吧,轮流占用CPU的这个处理时间就可以了,那我们这里边在代码里边怎么样去配置,当前一共要有多少个并行的线程呢?
01:08
而这里边大家就自然会想到了啊,是不是可以在环境里边去做配置呢?对,当然是可以的,这里边比方说大家看env,然后直接set,直接就弹出来一个set叫做。Parallelism对吧?Parallelism大家知道parallel是并行的意思,那parallelism就是并行度啊,所以这个就是针对当前的执行环境,直接去设置一个并行度,那那所以这里面大家看传一个这个int嘛,那不就是你想要几就给几吗?啊,我这里比方说突破我自己的CPU限制啊,我直接给一个八行不行呢,当然是可以的,我这里边运行一下,然后给大家再测一测试试看啊,我们现在已经执行起来了,那接下来我们还是啊输入,哎,这边我就直接在这里面输入啊,大家看看现在的效果是什么样呢?Hello word诶大家看突破四的限制了,三和五对不对啊,然后接下来我们继续多输入几个啊,Hello flink。
02:09
哎,现在你看这可以到期了啊,我们继续hellotla啊,那能不能突破呢?现在是一和三啊,然后我们这个how are you,哎,大家看这个四和六也都出现了,这个基本上一到一到八好像还没出现八,呃,我们我们再往后看一看啊,这个随便输数据。看看是不是符合我们的预期啊,这个and you。诶,大家看果然是可以到八,但是你想做测试的话,继续往后测吧。它永远不会超过八,永远都是在一到八之间去做这样的一个轮转转换的啊,那这里面可能就有同学有疑问,就是说,诶,那你这个到底每一个对应的这个输出,它到底是执行在哪个线程上,这个有规律吗?啊,其实有同学已经猜到了啊,就这里边的是有规律的,什么规律呢?按照当前这个word统计,当前我们这个word值它的哈希去做这样的一个分配。
03:11
为什么呢?这里面大家如果有这个细心的同学会发现,你看这个,哈哈,我们出现了很多次对吧,之前前一次我们去做这个处理的时候。呃,就是四核心的时候啊,默认情况下四个线程的时候哈,好像输出的这个前面的这个编号都是二对吧,而现在呢,前面编号都是都是三,而且你会发现这里边的这个就是只要是哈喽后边哈喽做的统计,它永远都在当前的这个编号,当前的这个线程上。哦,所以这相当于是有一个什么要求呢,这就是这主要问题出在前面,我们这里边做了K做分组。那大家想想,这个基于某个K去做分组的时候,实际在这一个并行分布式架构里边啊,它是怎么去做分组的呢?其实非常简单,我们知道本身这里边有有很多个并行的这个任务,对吧?哎,这里边下一步我我都要再去,呃,比方说下面我这里边都要再去做这个sum了。
04:17
那上游我这里边是map之后的这个结果啊,做完map之后的这个结果,这里边并行的,假如说有三个,那接下来如果说后边我都要去做sum,这里边可能也有并行的好几个sum。那那在这种情况下,我到底哪一个这个这个卖不出来的这个数据啊,是到底分发到哪一个项目里边去做处理呢?现在我们要分组,KPI要分组,那KBY怎么分组呢?是不是我就正常来讲,应该是同一个K的话,我就让他分到同一个这个sum任务里边去做统计,对吧?你如果是分散到不同的任务里的话,那后续我不还得再合并吗?那那个就很麻烦嘛,所以说肯定是分到同一个任务里边,这个就是最简单的一种方式。
05:04
所以这就会出现一个什么情况,是不是上游不同的任务就有可能要把自己的数据接下来都发送到同一个下游的那个任务里边去啊,哎,所以这个过程就相当于有一个数据重分区的过程了啊,有点类似于之前大家讲这个Spark的时候,大家熟悉的那个沙,对吧?啊有有点类似,但是不太一样,我们这里边这个重分区,这是基于什么重分区呢?因为这里做的是KBYK拜,它基于的就是当前K的哈希code啊,首先是基于这个哈希code,但并不完全是以这个哈西扣的作为,呃,这个分区号,因为大家知道你这个求哈希的话,那个数大的去啊,你这里边能提供的这个并行任务,那编号可能没有那么大,那怎么办呢?啊,最后肯定有一个类似于取模取鱼的一个过程,对不对啊,类似于这样的一个处理过程,所以最后就会得到一个哎。
06:08
我们当前并行子任务的那个编号,那接下来我至少能保证什么,你同一个K的数据来了之后,那我经过哈希运算,然后再做曲模,是不是总会给它分配到同一个子务里面去啊啊,那至于这个不同的K,不同的word来了之后呢,有可能分发到同一个子任务里边去,也有可能就分到别的地方去了,对吧?那这个就看你那个呃,哈希,然后取鱼之后到底计算是多少。所以大家会看到我们这里边得到这个结果呢,这个啊三这里不光只有这个hello,也有thank对吧?啊也有别的一些这些数据,那那比方说像这个无里边呢,有fine也有U对吧?不同的这个word是可以放在同一个当前的这个,呃,就是大家理解的这个线程里边去做处理的,一个并行任务里面去做处理的,那但是我们会发现同一个word的话,它肯定做统计的时候,总是在同一个同一个这个当前的并行子任务里边做的处理。
07:12
这就是关于这个并行任务,以及啊,大家对于当前输出的这一个,呃,编号啊,啊做了一个扩展的了解啊,这是这部分内容,我先把这个停掉,然后关于并行,这这里边我们所说的这个叫做并行度对吧?呃,Para,那这里边如果说我不想在这个全局去做这个设置行不行呢?啊当然是可以的,就是我们知道如果不在全局去做,去做设置,这里边有默认。在这个开发环境里边,大家注意啊,我现在强调的是在开发环境里边,当前默认的并行度就是你电脑的核心数量啊,那言下之意就是说在生产环境里边不一样,生产环境里边啊,那就是可以有配置文件去配了,跟这个自己开发环境做测试不太一样了啊,然后另外还有一个是link给我们提供了非常丰富灵活的设置并行度的手段。
08:10
比如说什么呢?大家会发现啊,下边我在做定义,当前做每一步操作的时候,那大家看这不就是一步一步操作吗?对吧?哎,Flat map,然后filter,然后map,然后呃,K外的话,大家知道这是做分组,可能不涉及到具体操作,只是一个重分区,对吧,只是一个杀Le类似的工作,那后面some,哎,这这里边的每一步,这不就相当于是一个任务一个操作啊,那大家想你之前我我们在这个。Spark里边的话,所有的东西是划分这个stage去去做这个操作的,那现在我们link里边呢,每一个算子,每一个任务,你可以认为它是独立的,就像刚才我们画这个图一样,哎,你你看到这里边我我们画这个图的时候上游。可以。可以有这个并行的好几个并行的子任务,对吧,然后下游呢,也可以有并行的好几个,而且甚至什么上下游他们并行的这个数量还可以不一样。
09:13
因为他们本身就是不同的任务嘛,对吧,我我在处理的时候,其实就是上游这边处理完了,把这个这个数据给到下一个任务去处理就完事了,所以我并不要求一定上游有三个map任务,下游就一定得有三个sum任务,没有没有这回事儿,我完全可以不一样。所以在flink里边我可以去设置,大家看我直接可以针对每一个算子在后边调一个方法叫set parallelism,直接给一个比方说我设置它是三,然后后面some设置它是。二这个都是完全没有问题的。我们这里边呃,做这个操作的时候得到的这个呃结果啊,在做这个并行计算的时候,你到这一步,那那前面没有设的,那应该是几呢?默认是四嘛,对吧?所以到这一步map这一步的时候呢,就按它的这个并行度是三,三个map任务去做操作,然后到下面sum这一步的时候呢,按照并行度是二两个sum去做操作。
10:13
啊,这个就是在实际我们做任务调度,做资源分配的时候,就会有更多的灵活了啊,啊这个当然就是这种用法,一般用的比较少,因为大家想你如果资源既然有那么多资源的话,并行度有,那么就是我当前有这个四核的话,你为什么不用呢?对吧,你还要人为的给它调小啊,那那这个,所以一般是不这么去单独再去做设置了,但是有一个特例,哪个特例呢?就是像我们最后要输出的时候,这里边print打印输出的时候,大家可能会发现前面我们这里边打印输出的时候啊。前面都有一个这个编号对吧?啊,那假如说我就想要当前前面没有编号,就像前面那个批处理做这个操作的时候一样啊,然后按照顺序一个一个输出就完事了,这样可不可以呢?哎,当然是可以的,只要你在后边单独的定义它的并行度是一就可以了。
11:13
这个在我们当前控制台上可能还表现不是很明显,你大家想想,你如果要是写入文件的话,你想把最后的结果都写入到一个文件里面去,如果说现在好几个线程同时往里写文件,哎呀,这个是不是就有点儿,呃,有点尴尬对吧,有可能会出现冲突,出出现问题,所以我们往往有时候这个写入文件的时候,或者说在其他的一些写入操作的时候,我要求你就排好顺序到这一步就是。按照顺序,你一个一个往里写就完事了啊,这个把它设成一,这是完全可以的。我们把这个再来啊,这这个等下再给大家一起测吧,啊啊,所以这是常见的对于并行度的一个设置的方法。呃,然后这里边还有一个小问题,那就是大家发现后面因为我们讲到要去做这个部署提交嘛,这里边有一个问题,就是说我们当前的这个端口号和主机名是直接写死在里边的,那你如果要是直接去做提交的话,你你代码里面写死local host的,这显然不合不合理,对吧?那你你如果要是执行的时候,当前你应该是读取配置文件呀,或者说你应该从当前的这个任务执行的时候传传进参数来给他做这个执行啊,对吧?我们这里边main方法不是有参数吗?那所以说当前这个显然不应该写死在代码里,那怎么样去做呢?
12:38
啊,其实也非常简单,利用参数对吧,哎,我们这里边。呃,从从外部命令中提取参数。作为。呃,这个就是我我们当前那个NC对吧?So socket文本流的主机名,主机名和端口号,好这里边就有不同的这个提取的方式,大家也可以直接,你看这个本来不就是在as里边吗?对吧,那方法这个A里边,你直接从这里边它是一个string类型的array,你直接把它提取出来做转换也可以,或者说这里边给大家推荐一种方法。
13:26
就弗林里边呢,给大家提供了一个这个叫做吧,呃。Para to啊,这个叫就是parameter two,这个东西是flink给我们单独提供的一个一个工具parameter two,来家看这个啊,这个东西它是在这个link API Java u下边给我们提供的啊,底层是这个Java代码对吧?啊,所以这里边我们是这样把它调用出来,然后本身的这个工具呢,其实我们看到里边你如果点进去之后,呃,你会发现啊,就是当前的这个类里边,它本身就给我们提供了一些静态方法,所以接下来我们就可以直接调用它里边的,呃,这个静态方法叫做什么呢?From a直接从当前的参数里边转换提取包装成一个类,叫做parameter two。
14:22
啊,相当于就是这样的一个过程对吧?啊,那这这里我们就把这个UPS直接传进来,然后接下来我们就可以定义,比方说当前的这个host,大家知道是一个string类型对吧?那我们直接就从当前的parameter two里边去get,大家看这个就像一个map一样啊,就像一个k value对一样,所以我直接get,比方说我们传参的时候,这个就叫host吧,哎,那我就get host,把我传进的那个东西拿出来就完事了,然后后边呢,Port也是一样,这个应该是一个int类型,那我们parater to去get的时候呢,大家看可以有一个get in,然后这里边我们本身就叫它port这样去做啊,那后边这就不要写死了,直接给host还有port就可以了。
15:11
好,然后接下来我们可以去测试运行一下啊啊,那当然这里边如果要去测试运行的话,我们还把这个并行度调了一啊,然后这里边你要运行的话,肯定是要先去配一下了,呃,Configuration对吧,我们要设置当前program的呃参数啊argument,那这里边我们杠杠host local host,我还是传这个是7777把它定义好。然后接下来去做一个运行。好,现在我们这个运行起来,接下来还是在NC这边输入一些数据,哎,大家看到现在输出的还是正常能够运行,而且输出因为我们设置这个打印的这个并行度是一,所以是不是这里边就没有前面那个角标了,看起来就跟之前那个批处理的那个结果差不多了,对吧?啊,那这里边hello flink同样可以有这样的一个效果。
16:08
啊,那有同学可能会想到,诶,你说是这个按照顺序一个一个输出,但这里面看起来好像不对啊,因为大家仔细看的话会发现你看你这个hello word哈在前,Word在后,那后面这个hellolink怎么弗link在前了呢。然后我们后面又输入了一个how are you,那为什么它它这个输出的顺序是啊,How you呢?难道不应该是完全按照顺序的吗?哦,这个大家要注意啊,当前我们做这样的测试的时候呢,是把最后一步就是当前,诶我们这个print啊,最后这一步做了一个并行度的调整,设成了并行度是一,而前边我们做这个计算的时候呢,那并行度还是四啊。那所以大家想到这就涉及到我们这个顺序的问题了啊,这我们说为什么会有乱序的问题,你前面是并行度是四,都有四个并行任务,然后再去做Fla map,再去filter,再去map,再去上,那你想最后传给这个print的时候,是不是有可能顺序都已经乱了呀。
17:10
对吧,也就是说我们这就相当于什么呢?前面比方说啊,我在做这个,比方说做这个map操作的时候。本来哎,这里边应该是how先进来对吧,How are you。但是呢,因为我们是并行的,所以大家可以认为它几乎是同时进入到我们当前的这个系统来,哎,我们当前应该是有四个这个并行的这个子任务啊,我只画了三个,大家知道什么意思就可以,然后后边哎,我又做这个sum,大家知道sum也是有四个。然后后边呢,做这个print print是有一个,那在这个处理的过程当中,大家就会发现了,你这里边读进来几乎是同时进来啊,尽管我们知道应该是耗在前面一点对吧,但是后边又要做传输啊,有可能耗你再去做那个传输的时候,哎,它可能稍微远一点对吧?啊这个R它有可能这个做网络传输的时候比较静一点,哎右这里边可能更静一点,那这就会导致后边我们处理的时候,他们就出现乱序了。
18:13
然后经过some处理之后呢,就变成了,就有可能是啊,第一个先到print这里来,然后后边才是how,后边才是U对吧?啊,所以后边我们这里边的顺序就打乱,这就是我们说的分布式架构带来的数据这个乱序的一个问题。啊,那那这个问题怎么去解决呢?啊,当然就是说,呃,有同学可能想到了,就是这个非常简单,你如果要是全局这里边我们定行都设成一,所有的都是一条线,按照这个一步一步去执行,那是不是最后一定是一个顺序啊啊这个大家下去之后可以测啊,确实是这样啊,但是你如果设成全局射程一的话,那我们这个高并发,就我们要做这个高吞吐量,那就做不了了,你只有一个线程了,那还怎么高高吞吐量啊啊所以呃,后边我们会给大家讲到时间语义啊,以及这个窗口里边处理迟到数据的一些操作。
19:10
那这一部分就给大家先扩展,讲到这里。
我来说两句