00:00
到目前为止,我们已经对当中的转有了充分的了解。如果总结一下的话,就会发现所谓的转换算子其实本质上就是针对data streamam进行一个方法调用,在代码里面的话,就是通过data streamam方法调用之后,经过转换处理又得到了一个新的data streamam这样的一个处理过程。那如果说提炼出来,我们画出最后的作业图,到作业真正提交运行的过程当中,对应的每一个转换算子都应该是一个任务,一个task。我们前面提到的基本转换还是聚合转换,或者是我们用。复函数类更加复杂的定义出来的udf,所有的这些转换操作都可以用这种方式。那除此之外呢,Datatream其实还可以调用一些其他的方法,比如说我们可以调用让两条流进行合并这样的一个合流操作,那另外呢,我们还可以让当前流里边的数据,当前流可能并没有完整的去做转换操作,而是针对当前流里的数据去做一个重新分配,重新分区的操作。这就是我们接下来要。
01:21
介绍的物理分区啊,那关于河流的操作呢,我们会放在后边去进行详细的讲解,接下来我们主要要看的是。分区操作。Physicalition,那所谓的分区就是我们所说的要将里边的数据进行重新分配。因为当前flink的集群在运行执行作业的时候是并行执行的,每一个任务它都可以有多个并行的子任务,所以在物理上我们应该有不同的lo。对应的分配去执行当前的子任务的实力,那所以数据来了之后呢,就应该有不同的分配方式,当前数据流里的每一个数据到来之后,应该都是交由一个分区子任务,一个slot上去运行去处理就可以了。
02:17
所以这里面每一个数据到底分配到哪里,就涉及到了一个物理上的分区的问题。那这些操作其实我们也并不陌生了,我们前面在介绍到聚合算子的时候,一开始就提到。在弗link里边,想要做聚合必须先,那这个操作呢,本质上来讲就是基于当前数据里边,我们首先要某提取某个字段或者进行某种组合,得到一个K当前的键按照K的哈希值做一个分配操作,那经过K外之后,当前数据流里的所有数据相当于就进行了重新的分区,重新的分配。
03:01
K的这种分区操作呢,我们说它是一种逻辑上的分区,因为并不是说一个K就一定会分到唯一的一个分区里边去,就不同的K1定会分到不同的分区里边去,并不是按照K去唯一决定当前的分区的,呃,因为我们说到它是要取哈希值,而不同的K求出来的哈希值不同,如果我们的K很多的话,很显然当前的分区数可能远远不够承受这么多的K,一定要在同一个分区里边要包含了多个K,如果出现这种情况,那相当于我们当前按照K来做的划分,其实只是一个逻辑上的分配,并不对应物理上到底在哪一个分区里面去执行。所以除了KY之外呢,Flink其实还提供了一些更加底层的比较硬核的物理上的分区操作,这就是我我们接下来要介绍的物理分区这样的算子或者说操作,那严格意义上来讲,这部分叫做算子可能并不是特别的合理,因为前面我们考虑到的算子operator,那它都应该是对当前的数据流data stream要有一个转换计算的过程。
04:17
那现在的物理分区呢,本质上跟K其实是类似的,对于当前的数据并没有任何的转换计算,只是把里边的数据进行了重新分配啊,所以接下来我们可以看一看当前物理分区到底是怎么样去做的啊,那它跟KY的最大不同其实就是我们可以真正的控制分区策略。假如说我们知道了当前。并行子任务有四个的话,那我们根据制定不同的分区策略,就可以明确的告诉当前的数据,你到底应该去哪一个分区。而不是像之前KY那样,我们只知道它的K是什么,但并不知道它具体到底要分到哪个分区去。这就是关于物理分区的一个基本的概念。
05:08
啊,那它的意义其实主要就在于我们要处理一些比较特殊的优化的场景,比如说。我们数据的应用场景里。由于数据量非常的大,这个时候呢,如果当前我们做K进行分组聚合的话,很有可能就会出现key是很多,但是呢,数据分配并不均匀,某一个key就像我们之前讲到的,用户点击访问的数据,可能某一个用户爱丽丝。它的点击次数就会非常非常多,在当前这段时间内有大量的Alice相关的数据,那很显然,如果说我们按照K去做对应的分区的话。就会导致当前处理数据的并行子任务某一个分区。
06:00
数据量非常的大,它的负担非常的重,而其他的分区有可能数据就非常的少,这就是我们所说的负载不够均衡,忙的忙死,闲的闲死。这也就是我们所说的数据倾斜的场景,那怎么样避免这样一种数据倾斜的状况呢?啊,那系统可能没有办法去自动调整,那这个时候呢,就需要我们进行重新做一个负载均衡了。当然在生产实际项目当中,常见的做法是把K再做一个重新的优化,让我们分配的时候尽量的不同的K能够分配,平均的分配到不同的分区里面去。那另外还有一种方式呢,就是我们可以调用一些物理分区的算子,直接指定当前的数据应该分配到哪里去。那其实关于物理分区的这个操作,我们在平常没有做任何调用的时候,它其实已经在发生了,因为我们想到在当前flink程序里边是可以针对每一个算子进行并行度的设置的。
07:05
那假如说我们前边一步操作,比如说map。后边又有一部flat。对于前后发生的两步操作而言,很很显然,数据流的数据应该在map这部操作处理完成之后,传递到下游的flat map这一个操作的子任务上去,诶,那这里他们的并行度不同的话,当前map第一个分区处理完的数据到底分发到哪里去呢?这里就涉及到了不同的策略。在flink默认的处理场景下,如果前后任务并行度不同,那么接下来他要做的操作其实就是要做一个平均分配。也就是说当前的数据。按照顺序1234。第一个数据处理完了之后,分发到下游第一个分区上去,第二个数据分发到第二个分区,第三个分发到第三个分区,第四个分发到第四个分区。
08:11
我们说这就是一种平均分配数据,是用了一种轮询的方式啊,那当然了,我们如果不做任何指定的话,弗利默认在底层会这样去调用,那后面我们会发现,其实直接在代码当中也可以调用相应的方法,直接指定数据应该怎么发送,接下来我们就来做一个详细的了解,在代码当中可以做一个测试。那首先我们还是在当前的包下边去新建一个Java,我们当前要测试的还是跟transform相关。现在是做一个分区。Test。首先我们还是先把整个测试的框架搭建起来。主方法里,首先我们需要引入当前的执行环境,Get execution environment。
09:09
叫做env,不是一般性的把当前全局的并行度设为一。接下来要做的就是读取数据了啊,读取数据的话,我们因为想要看到它向下游分配数据的这个过程,那应该要多一点测试数据,我们可以借用一下之前reduce这个代码里边我们的测试数据比较多,可以把这一部分直接copy过来。现在我们已经得到了一个基本的数据流,里边的数据类型,就是现在我们有八条数据。得到了一个。接下来呢,我们基于strip可以调用一些。物理分区的转换方法,那首先这里边我们看到它是一个逻辑上的分区,我们得到的是一个k strip,然后基于它主要做的操作是要进行一些聚合计算,而现在如果想要做物理分区的话,能调用什么样的方法呢?
10:07
最为简单的物理分区的方式就是杀否。啊,其实我们知道在Spark里边也有对应的shuffle的定义,那其实所谓的shuffle那就是洗牌,就是把所有的数据全部随机的打散,然后随机的分配到下游的并行子任务里面去,也就是说如果当前我们上游的子任务并行度是一。下游的子任务并行度是四的话。那当前我们能看到的信息应该就是。没完全没有任何规律,就是当前来了一个数据,它有可能分配到下游四个并行子任务里的任意一个,再来一个数据,又是任意的分配,没有任何的规律可循。当然了,整体来讲,随机分区就是我们所说的这个杀父操作,它是服从均匀分布的,如果说我们的数据量足够大的话,最后其实就是一个均匀分配的过程。
11:07
最后每一个分区处理的数据量应该是基本相同的啊,这就是所谓的沙,那我们可以直接的把它做一个测试。这里面既然我们全局的并行度设了一,想要看到结果的话,应该给杀否之后的。下一步计算,比如说我们就直接打印输出了print,那应该要增加一个对应的并行度设置,我们可以设置它的并行度为四。就按前面我们说的好,那最后可以来一个env execute。接下来直接运行一下,看看效果怎么样。我们可以看到。这里直接将八条数据全部输出了,它的输出规律,21431242,好像并没有什么规律,前四个看起来好像是基本上平均分布的,但是后四个会发现二多多了一次,三少了一次,诶,所以整体来看它没有任何的规律可循,我们现在的数据只有八个,数据量比较少,所以它并不是严格意义上的每个分区出现两次啊,那如果说数据量足够多的话,最后会发现它是服从均匀分布的。
12:26
这就是所谓的杀手。除了沙之外,沙否可以说是最为简单,也是我们能够想到的最为直接的把数据分配的更加均匀的一种重分区方式了。这是随机分区杀。那除了它之外,接下来我们还可以有其他的分区方式。第二种分区方式也非常的经典,就是所谓的轮巡分区。那轮均分区呢,跟沙比起来,沙否是直接随机了,每来一个数据。
13:01
都不知道接下来到底分配到哪里去,只不过是它们出现的概率可能是完全一样的,而轮巡分区呢?呃,就是我们所说的。按照次序,第一个分配到第一个分区,第二个分配到第二个分区依次来,所以如果说之前的沙是一个洗牌的话,那现在轮巡分区就像是一个发牌啊,那所以这个轮巡分区它的这种操作呢,也是假如说我们当前没有任何的重分区操作定义的话,直接出现并行度调整,前面是一,后边是四,出现这种情况的时候。Flink底层做的其实就是这样的一个轮询分区啊,那如果说我们想要显示的指定的话,应该调用什么呢?要调用的是reb,就是我们所说的重平衡再平衡,这样就是要做轮巡式的发牌,做重分去操作啊,那后面我们同样可以加一个print,然后set paraly4。
14:02
上面这一部分我们可以直接住掉,看一下轮巡分区的输出结果有什么不同。这里我们可以看到就非常的明显了,输出的结果是1342 1342,完全按照某种顺序依次输出,如果我们看得更加仔细的话,会发现这里我们的print打印的时候,它是四个线程并行打印的,因为我们有四个分区嘛。而这里边打印输出的时候,直接显示在控制台上,因为是并行的,所以他们的顺序其实是完全打散的,并不是按照所有数据的输入顺序一条一条输出的啊,那这里其实我们的第一条数据是Mary maryry1条数据呢,其实在这里第一秒的Mary数据,它是由第四个分区,最后一个分区直接打印输出的。
15:02
而第二条Bob cut,第二秒的这个数据,这是第一个分区,下一条数据,第三秒的Alice的访问数据,那是在这里由第二个分区来输出的,哦,那下一个3.3秒的爆破访问数据是由第三个分区来输出的。所以我们会发现这里边的轮询顺序其实还是1234,只不过是最后我们的并行打印,导致看到在控制台上面的显示并不是按照顺序了。那本质上我们这里的顺序应该是。4123啊,我们真正处理数据的顺序应该是这样的一个过程,然后接下来又是四,我们看到这里是3.2秒的下一个数据。按照输入3.2秒,下一个爱丽丝的数据是我们的第五条数据,然后再下一个,那又是一是Bob的3.5秒的访问数据,再下一个是23.8秒的访问数据,最后一个是4.2秒的访问数据。
16:06
41234123,这就是我们整个进行轮询的过程。当然了,其实如果我们把这个reb去掉的话,直接去print之后,然后去set para4,那我们知道这里有一个并行度的调整S算子,它的并行度是1PRINT,这里相当于是一个输出算子了,它的并行度是四,那这个过程。Flink底层自动的就会使用reb方式去做一个轮的重分区,这里我们可以直接运行一下,看一看,跟之前的效果一样不一样。啊,我们会发现好像输出的结果有所不同了,但如果仔细去观察的话,就会发现。只是每条数据具体输出到的分区可能有所不同,这可能每次我们运行代码,最后它分分到哪一个分区可能都有所不同,就第一个选择的分区到底是哪里,这个是根据当前的系统状态有关的,而我们看到当前的第一条数据,Mary的点击,它是第三个分区。
17:12
啊,然后三后面就应该是四了,所以第二条数据BOB2秒钟的点击是第四个分区输出的,那接下来三四下一个是一,显然第三秒钟Alice的访问数据,我们的第三条数据,那就是第一个分区输出,那3412,第四条数据3.3秒的报不访问数据,由第二个分区输出。所以现在的轮询顺序变成了3412,接下来又是三四。13.8秒的数据,24.2秒的数据。所以我们会发现,本质上来讲,默认情况下,如果前后的任务并行度不同,Flink底层调用的就是re balance的分区方法。
18:00
就是关于轮询分区。
我来说两句