00:00
另外呢,还有一个比较特殊一点的重分区方式呢,叫做广播。这种方式某种意义上来讲,不应该叫做重分区啊,因为所谓的广播,它其实就是把当前的一个数据。分发到了下游所有的并行子任务上去啊,就正常来讲,我们流处理是当前任务处理完一个数据之后,只会分配分发到下游的一个并行子任务上去做一个处理,而广播出去之后的数据呢,它是要做。他是要广播到所有定性子任务上啊,那这样的话就相当于会重复处理了。这种方式往往是在一些特定的需求下,可能会有用途啊,比如说这里我们直接基于前面的stream去做一个广播,广播要调用的就是broadcast这样一个方法,调用broadcast方法,然后接下来print做一个打印,如果说我们还是当前的并行度设成四的话,接下来可以看一看。
01:08
我们可以把上面的这一部分先做一个注释。接下来直接运行一下。我们可以看到当前输出的效果呢,就变成了每一条数据,Mary第一条数据来了之后,12344个分区全输出了一遍,那同样Bob第二条数据来了之后,也是12344个分区全部都输出了一遍。并行输出啊,所以我们会发现,相当于每一条数据都被处理了四次,这就是所谓的广播分区。那另外还有一个比较特殊的分区方式。也可以介绍一下,就是所谓的。全局分区。全区分区呢,调用的是。Global。
02:00
Logo方法,所谓的全局分区啊,那其实是一个非常极端的方式啊,全局的意思就是说把所有的数据,全局的数据全部都分配到一个分区里面去。啊,那假如说我们下游算子这个print并行度还是四。它会分配到哪里去呢?呃,我们把上面这个注掉,只看全局分区,然后测试一下,看看效果是什么样的。我们可以看到它会把所有的数据全部分配到第一个并行子任务里面去。所以这就相当于强行的让下游。任务它的并行度其实设几都没用了,Global之后相当于并行度就变成一了,所以除非是我们真的需要把所有的数据都合并在一起去做处理啊,那这个时候往往是已经前面已经做过分区的一些统计计算,得到的是一些少量的结果,这个时候放在全局一个分区去做计算是可以的。
03:01
那一般情况下使用全局分区一定要非常的谨慎,否则的话,如果我们直接使用全局分区的话,有可能会对当前的并行系统造成很大的处理压力,因为相当于病情就没有用了。除此之外,前面我们介绍的都是直接在基于data stream去调用一个方法,那除了这些之外呢,Flink还给了我们更加灵活的进行分区的策略,那就是所谓的自定义重分区策略。这是最后一种。自定义重分区。这种方式它的一般化也更加的强大,简单来讲就是我们可以直接自己定义一个分区器,然后这个分区器呢,就是直接可以指定这个数据长什么样,我就把它分配到哪个分区里边。哎,所以有了这样一个。自定义的分区器的话,理论上我们就可以完美的去控制所有数据分配到哪个分区了啊,接下来我们可以简单的做一个实现,当然这种实现的话,我们用可能就用。
04:13
比较麻烦了,所以我们还是直接使用整数,比方说还是这个12345678,诶那现在呢,不涉及到重新定义s function啊,不涉及到s function需要调整进行度,我们就直接en去from elements,把12345678传入就可以了。八个数。然后接下来我们可以。盗用一个。自定义的分区方法。这个方法就叫做。字,字面意义上非常的明确了,就是用户自定义的。分区方法,那里边要传什么东西呢?我们会发现啊,这里边有一些方法已经被弃用了,简单的调用都被弃用了,推荐的方法是什么呢?是传入一个petitioner分区器,另外还有一个参数传入一个select。
05:05
He select我们非常熟悉,这在做KBY的时候要传的不就是它吗?是选择当前数据里边的某个字段作为当前的K啊,那现在我们当前就只有一个数,那这个字段怎么样去定义呢?啊,不管怎么样吧,我们先把当前的。Positioner先定义出来,然后接下来后边还需要有一个你有一个k select,诶,那这里我们拿到的其实就是一个in体值,那最后我们想要得到的一个对应的K,那当然也只能是in体制了,那所以。我们也可以做一些转换,现在显然是没有必要去转换的,里边必须要实现的一个方法就叫做get,跟T里要实现的是一样,我们现在只有一个整数,那就直接把自身返回就可以了。然后接下来当前的这个Peter。
06:00
我们定义的这个分区器,它里面必须要实现的就是一个petition方法,哎,那这里面很显然我们应该。传入的这里边的这个类型应该跟后边定义的K的类型应该一样。因为我们明显就是要基于当前提取出来的K去做一个自定义的分区啊,那这里面我们定义了英体制,上面也就应该定义in体制才对。下面我们实现一个protect方法,那比如说这里边我们的方法就可以非常简单了,我们可以直接还是类似的规则,就按照奇偶性直接去做一个分区啊,那所以这里边可以直接按照当前K对二进行取余之后的结果,直接return一个int,那这个int就代表了最终物理分区的索引号。有了这个操作之后,那下面我们就可以直接把它做一个print。啊,当然了,最后我们还可以设置一下当前的并行度是四,那我们看一看得到的结果是什么样的。
07:04
运行一下。这里可以看得非常明显,因为我们已经指定了当前的分区策略,那很很显然,即使并行度是四,这里边只有机有两种情况,所以最终相当于只有第一个和第二个分区会输出对应的信息。我们所有的数据,奇数都以第二个分区来输出,而偶数都以第一个分区来输出。所以其实我们会发现这里打印输出的是1212,其实本质上这里边返回它的分区号,其实偶数应该是零,而奇数应该是一,所以这这也是为什么前面我们进行。自定义s function的时候,我们这里边其实是发送到零号和一号并行分区啊,当前的这个分区号是从零开始的。
08:03
这里只是打印让我们看得更加的更加的清晰,更加的明确而已。这就是对于。自定义分区的一种使用方式。一般这种方式呢,其实在实际项目过程当中,都是要对数据进行调整,进行优化处理的时候会比较有用。那对于常规场景下,如果数据不出现数据倾斜,或者是一些分配严重不均匀,负载严重不均衡的场景的话,我们其实没有必要去手动调用这些方法。
我来说两句