前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >说说Flink DataStream的八种物理分区逻辑

说说Flink DataStream的八种物理分区逻辑

作者头像
王知无-import_bigdata
发布2019-12-26 13:09:06
2.5K1
发布2019-12-26 13:09:06
举报

By 大数据技术与架构

场景描述:Spark的RDD有分区的概念,Flink的DataStream同样也有,只不过没有RDD那么显式而已。Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向。

Spark的RDD有分区的概念,Flink的DataStream同样也有,只不过没有RDD那么显式而已。Flink通过流分区器StreamPartitioner来控制DataStream中的元素往下游的流向,以StreamPartitioner抽象类为中心的类图如下所示。

在Flink的Web UI界面中,各算子之间的分区器类型会在箭头上标注出来,如下所示。

StreamPartitioner继承自ChannelSelector接口。这里的Channel概念与Netty不同,只是Flink对于数据写入目的地的简单抽象,我们可以直接认为它就是下游算子的并发实例(即物理分区)。所有StreamPartitioner的子类都要实现selectChannel()方法,用来选择分区号。下面分别来看看Flink提供的8种StreamPartitioner的源码。

GlobalPartitioner
代码语言:javascript
复制
    // dataStream.global()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return 0;
    }

GlobalPartitioner只会将数据输出到下游算子的第一个实例,简单暴力。

ShufflePartitioner
代码语言:javascript
复制
    private Random random = new Random();
    // dataStream.shuffle()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        return random.nextInt(numberOfChannels);
    }

ShufflePartitioner会将数据随机输出到下游算子的并发实例。由于java.util.Random生成的随机数符合均匀分布,故能够近似保证平均。

RebalancePartitioner
代码语言:javascript
复制
    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);
        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }
    // dataStream.rebalance()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

RebalancePartitioner会先随机选择一个下游算子的实例,然后用轮询(round-robin)的方式从该实例开始循环输出。该方式能保证完全的下游负载均衡,所以常用来处理有倾斜的原数据流。

KeyGroupStreamPartitioner
代码语言:javascript
复制
    private final KeySelector<T, K> keySelector;
    private int maxParallelism;
    // dataStream.keyBy()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
        }
        return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
    }

    public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
        return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    }

    public static int assignToKeyGroup(Object key, int maxParallelism) {
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

    public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
        return keyGroupId * parallelism / maxParallelism;
    }

这就是keyBy()算子底层所采用的StreamPartitioner,可见是先在key值的基础上经过了两重哈希得到key对应的哈希值,第一重是Java自带的hashCode(),第二重则是MurmurHash。然后将哈希值乘以算子并行度,并除以最大并行度,得到最终的分区ID。

BroadcastPartitioner
代码语言:javascript
复制
    // dataStream.broadcast()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
    }

    @Override
    public boolean isBroadcast() {
        return true;
    }

BroadcastPartitioner是广播流专用的分区器。由于广播流发挥作用必须靠DataStream.connect()方法与正常的数据流连接起来,所以实际上不需要BroadcastPartitioner来选择分区(广播数据总会投递给下游算子的所有并发),selectChannel()方法也就不必实现了。细节请参见Flink中BroadcastStream相关的源码,这里就不再列举了。

RescalePartitioner
代码语言:javascript
复制
    private int nextChannelToSendTo = -1;
    // dataStream.rescale()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        if (++nextChannelToSendTo >= numberOfChannels) {
            nextChannelToSendTo = 0;
        }
        return nextChannelToSendTo;
    }

这个看起来也太简单了,并且与RebalancePartitioner的逻辑是相同的?实际上并不是。我们看看StreamingJobGraphGenerator类,它负责把Flink执行计划中的StreamGraph(逻辑执行计划)转换为JobGraph(优化的逻辑执行计划)。其connect()方法中有如下代码。

代码语言:javascript
复制
        if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    DistributionPattern.ALL_TO_ALL,
                    resultPartitionType);

粗略地讲,如果分区逻辑是RescalePartitioner或ForwardPartitioner(下面会说),那么采用POINTWISE模式来连接上下游的顶点,对于其他分区逻辑,都用ALL_TO_ALL模式来连接。看下面两张图会比较容易理解。

也就是说,POINTWISE模式的RescalePartitioner在中间结果传送给下游节点时,会根据并行度的比值来轮询分配给下游算子实例的子集,对TaskManager来说本地性会比较好。而ALL_TO_ALL模式的RebalancePartitioner是真正的全局轮询分配,更加均衡,但是就会不可避免地在节点之间交换数据,如果数据量大的话,造成的网络流量会很可观。

ForwardPartitioner
代码语言:javascript
复制
   // dataStream.forward()
   @Override
   public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
       return 0;
   }

与GlobalPartitioner的实现相同。但通过上面对POINTWISE和ALL_TO_ALL连接模式的讲解,我们能够知道,它会将数据输出到本地运行的下游算子的第一个实例,而非全局。在上下游算子的并行度相同的情况下,默认就会采用ForwardPartitioner。反之,若上下游算子的并行度不同,默认会采用前述的RebalancePartitioner。

CustomPartitionerWrapper
代码语言:javascript
复制
    Partitioner<K> partitioner;
    KeySelector<T, K> keySelector;
    // dataStream.partitionCustom()
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        K key;
        try {
            key = keySelector.getKey(record.getInstance().getValue());
        } catch (Exception e) {
            throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
        }

        return partitioner.partition(key, numberOfChannels);
    }

这就是自定义的分区逻辑了,我们可以通过继承Partitioner接口自己实现,并传入partitionCustom()方法。举个简单的栗子,以key的长度做分区:

代码语言:javascript
复制
    sourceStream.partitionCustom(new Partitioner<String>() {
      @Override
      public int partition(String key, int numPartitions) {
        return key.length() % numPartitions;
      }
    }, 0);
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术与架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • GlobalPartitioner
  • ShufflePartitioner
  • RebalancePartitioner
  • KeyGroupStreamPartitioner
  • BroadcastPartitioner
  • RescalePartitioner
  • ForwardPartitioner
  • CustomPartitionerWrapper
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档