专栏首页大数据成神之路Kafka消费者分区分配策略及自定义分配策略

Kafka消费者分区分配策略及自定义分配策略

kafka消费者如何分配分区以及分配分区策略和源码解释

我们知道kafka的主题中数据数据是按照分区的概念来的,一个主题可能分配了多个分区,每个分区配置了复制系数,为了可用性,在多个broker中进行复制,一个分区在多个broker中选举出一个副本首领,消费者只访问这个分区副本首领,这些在本章节不重要,本章节阐述一个消费者如何选定一个主题中多个分区中的一个分区,和kafka的分区分配策略核心源码解析。

kafka中分区策略核心实现有两种 一种是range范围策略,一种是roudRobin轮询策略,在构建KafkaConsumer类的时候配置,看一下策略的关系就能自行配置, 配置key为partition.assignment.strategy的具体实现,看下图:

首先我们需要有多种假设来举例

假设我们创建了一个主题,并且8个分区p0-p8,我们有3个消费者c0-c2

先来说说第一种策略, range策略

上面已经做好了一些假设

根据range策略,分区按照顺序平铺,消费者按照顺序平铺

分区数量除以消费者数量,这里是分区数量8除以消费者数量3 等于 2 (N),再分区数量8对消费数量3取余得到2 ( M ),kafka的range算法是前 M个消费能得到N+1个分区,剩余的消费者分配到N个分区

具体算法:假设区分数量为pCout,消费者数量为cCount

n = pCout / cCount 8 / 3 = 2

m = pCount % cCount 8 % 3 = 2

前m(2)个消费者得到n+1(2+1)个分区,剩余的消费者分配到N(2)个分区,最终结果如下图

range策略是kafka默认的一个分区分配的策略可以看看ConsumerConfig类的static块,默认配置的RangeAssignor

想看一下分配分区的策略的入口可以参考KafkaConsumer类中的pollOnce方法进去,里面调用的ensurePartitionAssignment方法,不过这里debug进去看还是挺复杂的,有兴趣的可以参考,篇幅讲的不是这些重点,具体入口可以看下图

下面看一看range策略中核心源码的实现,具体查看RangeAssignor类

@Override    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,                                                    Map<String, List<String>> subscriptions) {        //获取每个主题消费者们    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);        Map<String, List<TopicPartition>> assignment = new HashMap<>();        for (String memberId : subscriptions.keySet())            assignment.put(memberId, new ArrayList<TopicPartition>());         for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {          //主题            String topic = topicEntry.getKey();            //这个主题的消费们            List<String> consumersForTopic = topicEntry.getValue();            //主题的分区数量            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);            if (numPartitionsForTopic == null)                continue;            //对主题的消费者进行排序            Collections.sort(consumersForTopic);            //主题数量除以主题消费者数量            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();            //主题数量对消费者数量进行取余            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();            //封装主题和分区信息            List<TopicPartition> partitions = partitions(topic, numPartitionsForTopic);            //下面就开始为每一个消费者分配分区,看到这里是不是会发现 消费者分区再均衡,每次添加消费者或者添加分区都会发生再均衡            //事件,不过这里不是重点            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {              //消费者分区起始位置                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);                //分配的分区数量, 从我们上面的假设的分区数量和消费者数量可以得出这里的值                // int length = 2 + (i + 1 > 2 ? 0 : 1);                //因为有的无法整除和取余的,所以前面的2个消费者这里会获得3 的结果, 最后一个消费者这里只能得到2                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);                //为每个消费者分配分区信息                assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));            }        }        return assignment;    }

下面讲一讲kafka自带的第二种消费者分配分区的策略

轮询策略

还是按照上面的假设8个分区3个消费者

8个分区按照顺序平铺

构造消费者环 c0,c1,c2,c0,c1,c2.......

轮询分配过程是 p0 分配给了 c0, p1 分配给了 c1, p2分配给了 c2, p3分配给了c0, p4分配给了 c1, p5分配给了c2, 一次类推,所有分区轮询分配给一个消费者环,大概草图如下

上面草图 多多理解 , 核心源码如下

@Override    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,                                                    Map<String, List<String>> subscriptions) {        Map<String, List<TopicPartition>> assignment = new HashMap<>();        for (String memberId : subscriptions.keySet())            assignment.put(memberId, new ArrayList<TopicPartition>());        //讲消费者集合进行排序,构建一个消费者环, 内部通过索引位置+1对总数取余的方式实现的环        CircularIterator<String> assigner = new CircularIterator<>(Utils.sorted(subscriptions.keySet()));        //对所有主题和分区进行排序, 假设集合中有多个主题/分区-分区,最终排序结果为        // t1/p0-p1-p2,t2/p0-p1,t3/p0-p1-p2        for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {          //当前主题            final String topic = partition.topic();            //这里循环遍历看看消费者有没有订阅改topic,否则一直next到下一个消费者,主要的作用是跳过            //没有订阅该主题的消费者            while (!subscriptions.get(assigner.peek()).contains(topic))                assigner.next();            //未当前消费者添加分区信息            assignment.get(assigner.next()).add(partition);        }        return assignment;    }

通过上面的的案例我们是不是可以通过继承AbstractPartitionAssignor抽象类,实现它的assign方法,来自定义消费者分区分配策略,因为这里我们得到了一个所有相关主题和主题分区数量,所有主题对应的消费者,那么就可以在这里根据自己实际场景自定义一些分配策略。

本文分享自微信公众号 - 大数据技术与架构(import_bigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-07-29

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink从入门到放弃-Flink分布式缓存

    在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

    大数据技术与架构
  • 7-Flink的分布式缓存

    Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。

    大数据技术与架构
  • Flink异步之矛-锋利的Async I/O

    在Flink 流处理过程中,经常需要和外部系统进行交互,用维度表补全事实表中的字段。

    大数据技术与架构
  • Hadoop数据分析平台实战——260用户数据ETL离线数据分析平台实战——260用户数据ETL

    离线数据分析平台实战——260用户数据ETL ETL目标 解析我们收集的日志数据,将解析后的数据保存到hbase中。 这里选择hbase来存储数据的主要原因就...

    Albert陈凯
  • 你真的懂Java中的String、StringBuilder和StringBuffer吗?

    相信String这个类是Java中使用得最频繁的类之一,并且又是各大公司面试喜欢问到的地方,今天就来和大家一起学习一下String、StringBui...

    java思维导图
  • 探秘Java中的String、StringBuilder以及StringBuffer

      相信String这个类是Java中使用得最频繁的类之一,并且又是各大公司面试喜欢问到的地方,今天就来和大家一起学习一下String、StringBuilde...

    Java团长
  • 你真的懂Java中的String、StringBuilder和StringBuffer吗?

    相信String这个类是Java中使用得最频繁的类之一,并且又是各大公司面试喜欢问到的地方,今天就来和大家一起学习一下String、StringBui...

    黄泽杰
  • 微信支付宝一码付

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    逝兮诚
  • 聊聊rocketmq的HAClient

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft
  • 聊聊rocketmq的HAClient

    rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/...

    codecraft

扫码关注云+社区

领取腾讯云代金券