前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka分区与消费者的关系

Kafka分区与消费者的关系

作者头像
java架构师
发布2019-02-22 15:13:53
1K0
发布2019-02-22 15:13:53
举报
文章被收录于专栏:Java架构师进阶

1. 前言

我们知道,生产者发送消息到主题,消费者订阅主题(以消费者组的名义订阅),而主题下是分区,消息是存储在分区中的,所以事实上生产者发送消息到分区,消费者则从分区读取消息,那么,这里问题来了,生产者将消息投递到哪个分区?消费者组中的消费者实例之间是怎么分配分区的呢?接下来,就围绕着这两个问题一探究竟。

2. 主题的分区数设置

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

在server.properties配置文件中可以指定一个全局的分区数设置,这是对每个主题下的分区数的默认设置,默认是1。

当然每个主题也可以自己设置分区数量,如果创建主题的时候没有指定分区数量,则会使用server.properties中的设置。

bin/kafka-topics.sh--zookeeperlocalhost:2181--create--topicmy-topic--partitions2--replication-factor1

在创建主题的时候,可以使用--partitions选项指定主题的分区数量

[root@localhostkafka_2.11-2.0.0]#bin/kafka-topics.sh--describe--zookeeperlocalhost:2181--topicabcTopic:abcPartitionCount:2ReplicationFactor:1Configs: Topic:abcPartition:0Leader:0Replicas:0Isr:0 Topic:abcPartition:1Leader:0Replicas:0Isr:0

3. 生产者与分区

首先提出一个问题:生产者将消息投递到分区有没有规律?如果有,那么它是如何决定一条消息该投递到哪个分区的呢?

3.1. 默认的分区策略

The default partitioning strategy:

If a partition is specified in the record, use it

If no partition is specified but a key is present choose a partition based on a hash of the key

If no partition or key is present choose a partition in a round-robin fashion

org.apache.kafka.clients.producer.internals.DefaultPartitioner

默认的分区策略是:

如果在发消息的时候指定了分区,则消息投递到指定的分区

如果没有指定分区,但是消息的key不为空,则基于key的哈希值来选择一个分区

如果既没有指定分区,且消息的key也是空,则用轮询的方式选择一个分区

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

/** * Compute the partition for the given record. * *@paramtopic The topic name *@paramkey The key to partition on (or null if no key) *@paramkeyBytes serialized key to partition on (or null if no key) *@paramvalue The value to partition on or null *@paramvalueBytes serialized value to partition on or null *@paramcluster The current cluster metadata */publicintpartition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic(topic);intnumPartitions = partitions.size();if(keyBytes ==null) {intnextValue = nextValue(topic);List availablePartitions = cluster.availablePartitionsForTopic(topic);if(availablePartitions.size() >0) {intpart = Utils.toPositive(nextValue) % availablePartitions.size();returnavailablePartitions.get(part).partition(); }else{// no partitions are available, give a non-available partitionreturnUtils.toPositive(nextValue) % numPartitions; } }else{// hash the keyBytes to choose a partitionreturnUtils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }}

通过源代码可以更加作证这一点

4. 分区与消费者

消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,那么消费者实例和分区之前的对应关系是怎样的呢?

换句话说,就是组中的每一个消费者负责那些分区,这个分配关系是如何确定的呢?

同一时刻,一条消息只能被组中的一个消费者实例消费

消费者组订阅这个主题,意味着主题下的所有分区都会被组中的消费者消费到,如果按照从属关系来说的话就是,主题下的每个分区只从属于组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。

那么,问题来了。如果分区数大于或者等于组中的消费者实例数,那自然没有什么问题,无非一个消费者会负责多个分区,(PS:当然,最理想的情况是二者数量相等,这样就相当于一个消费者负责一个分区);但是,如果消费者实例的数量大于分区数,那么按照默认的策略(PS:之所以强调默认策略是因为你也可以自定义策略),有一些消费者是多余的,一直接不到消息而处于空闲状态。

话又说回来,假设多个消费者负责同一个分区,那么会有什么问题呢?

我们知道,Kafka它在设计的时候就是要保证分区下消息的顺序,也就是说消息在一个分区中的顺序是怎样的,那么消费者在消费的时候看到的就是什么样的顺序,那么要做到这一点就首先要保证消息是由消费者主动拉取的(pull),其次还要保证一个分区只能由一个消费者负责。倘若,两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序,这就跟主动推送(push)无异。

4.1. 消费者分区分配策略

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

如果是自定义分配策略的话可以继承AbstractPartitionAssignor这个类,它默认有3个实现

4.1.1. range

range策略对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor

这是默认的分配策略

可以通过消费者配置中partition.assignment.strategy参数来指定分配策略,它的值是类的全路径,是一个数组

/** * The range assignor worksona per-topic basis.Foreachtopic, we lay out the available partitionsinnumericorder*andthe consumersinlexicographicorder. Wethendivide the numberofpartitionsbythe total numberof* consumerstodetermine the numberofpartitionstoassigntoeachconsumer.Ifit doesnotevenly * divide,thenthe first few consumers will have one extra partition. * *Forexample, suppose there are two consumers C0andC1, two topics t0andt1,andeachtopic has3partitions, * resultinginpartitions t0p0, t0p1, t0p2, t1p0, t1p1,andt1p2. * * The assignment will be: * C0: [t0p0, t0p1, t1p0, t1p1] * C1: [t0p2, t1p2] */

range策略是基于每个主题的

对于每个主题,我们以数字顺序排列可用分区,以字典顺序排列消费者。然后,将分区数量除以消费者总数,以确定分配给每个消费者的分区数量。如果没有平均划分(PS:除不尽),那么最初的几个消费者将有一个额外的分区。

简而言之,就是,

1、range分配策略针对的是主题(PS:也就是说,这里所说的分区指的某个主题的分区,消费者值的是订阅这个主题的消费者组中的消费者实例)

2、首先,将分区按数字顺序排行序,消费者按消费者名称的字典序排好序

3、然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

例如,假设有两个消费者C0和C1,两个主题t0和t1,并且每个主题有3个分区,分区的情况是这样的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,基于以上信息,最终消费者分配分区的情况是这样的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

为什么是这样的结果呢?

因为,对于主题t0,分配的结果是C0负责P0和P1,C1负责P2;对于主题t2,也是如此,综合起来就是这个结果

上面的过程用图形表示的话大概是这样的:

阅读代码,更有助于理解:

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

publicMap> assign(Map partitionsPerTopic,Map subscriptions) {// 主题与消费者的映射 Map> consumersPerTopic = consumersPerTopic(subscriptions);Map> assignment =newHashMap<>();for(StringmemberId : subscriptions.keySet()) assignment.put(memberId,newArrayList());for(Map.Entry> topicEntry : consumersPerTopic.entrySet()) {Stringtopic = topicEntry.getKey();// 主题List consumersForTopic = topicEntry.getValue();// 消费者列表// partitionsPerTopic表示主题和分区数的映射// 获取主题下有多少个分区Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if(numPartitionsForTopic ==null)continue;// 消费者按字典序排序Collections.sort(consumersForTopic);// 分区数量除以消费者数量intnumPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();// 取模,余数就是额外的分区intconsumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for(inti =0, n = consumersForTopic.size(); i < n; i++) {intstart = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);intlength = numPartitionsPerConsumer + (i +1> consumersWithExtraPartition ?0:1);// 分配分区assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } }returnassignment;}

4.1.2. roundrobin(轮询)

roundronbin分配策略的具体实现是org.apache.kafka.clients.consumer.RoundRobinAssignor

/** * The round robin assignor lays out all the available partitions and all the available consumers. It * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * will be within a delta of exactly one across all consumers.) * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p2, t1p1] * C1: [t0p1, t1p0, t1p2] * * When subscriptions differ across consumer instances, the assignment process still considers each * consumer instance in round robin fashion but skips over an instance if it is not subscribed to * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2, * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. * * Tha assignment will be: * C0: [t0p0] * C1: [t1p0] * C2: [t1p1, t2p0, t2p1, t2p2] */

轮询分配策略是基于所有可用的消费者和所有可用的分区的

与前面的range策略最大的不同就是它不再局限于某个主题

如果所有的消费者实例的订阅都是相同的,那么这样最好了,可用统一分配,均衡分配

例如,假设有两个消费者C0和C1,两个主题t0和t1,每个主题有3个分区,分别是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那么,最终分配的结果是这样的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

用图形表示大概是这样的:

假设,组中每个消费者订阅的主题不一样,分配过程仍然以轮询的方式考虑每个消费者实例,但是如果没有订阅主题,则跳过实例。当然,这样的话分配肯定不均衡。

什么意思呢?也就是说,消费者组是一个逻辑概念,同组意味着同一时刻分区只能被一个消费者实例消费,换句话说,同组意味着一个分区只能分配给组中的一个消费者。事实上,同组也可以不同订阅,这就是说虽然属于同一个组,但是它们订阅的主题可以是不一样的。

例如,假设有3个主题t0,t1,t2;其中,t0有1个分区p0,t1有2个分区p0和p1,t2有3个分区p0,p1和p2;有3个消费者C0,C1和C2;C0订阅t0,C1订阅t0和t1,C2订阅t0,t1和t2。那么,按照轮询分配的话,C0应该负责

首先,肯定是轮询的方式,其次,比如说有主题t0,t1,t2,它们分别有1,2,3个分区,也就是t0有1个分区,t1有2个分区,t2有3个分区;有3个消费者分别从属于3个组,C0订阅t0,C1订阅t0和t1,C2订阅t0,t1,t2;那么,按照轮询分配的话,C0应该负责t0p0,C1应该负责t1p0,其余均由C2负责。

上述过程用图形表示大概是这样的:

为什么最后的结果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

这是因为,按照轮询t0p1由C0负责,t1p0由C1负责,由于同组,C2只能负责t1p1,由于只有C2订阅了t2,所以t2所有分区由C2负责,综合起来就是这个结果

细想一下可以发现,这种情况下跟range分配的结果是一样的

5. 测试代码

<?xml version="1.0"encoding="UTF-8"?>4.0.0com.cjs.examplekafka-demo0.0.1-SNAPSHOTjarkafka-demoorg.springframework.bootspring-boot-starter-parent2.0.5.RELEASE<!-- lookup parent from repository -->UTF-8UTF-81.8org.springframework.bootspring-boot-starter-weborg.springframework.kafkaspring-kafkaorg.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-maven-plugin

package com.cjs.kafka.producer;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclass HelloProducer {publicstaticvoidmain(String[] args) { Properties props =newProperties(); props.put("bootstrap.servers","192.168.1.133:9092"); props.put("acks","all"); props.put("retries",0); props.put("batch.size",16384); props.put("linger.ms",1); props.put("buffer.memory",33554432); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); Producer producer =newKafkaProducer(props);for(inti =0; i <100; i++) { producer.send(newProducerRecord("abc", Integer.toString(i), Integer.toString(i)),newCallback() { @OverridepublicvoidonCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e) { e.printStackTrace(); }else{ System.out.println("callback: "+ recordMetadata.topic() +" "+ recordMetadata.partition() +" "+ recordMetadata.offset()); } } }); } producer.close(); }}

如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。

package com.cjs.kafka.consumer;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Arrays;importjava.util.Properties;publicclass HelloConsumer {publicstaticvoidmain(String[] args) { Properties props =newProperties(); props.put("bootstrap.servers","192.168.1.133:9092"); props.put("group.id","test"); props.put("enable.auto.commit","true"); props.put("auto.commit.interval.ms","1000");// props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer =newKafkaConsumer(props); consumer.subscribe(Arrays.asList("foo","bar","abc"));while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record : records) { System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }}

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.01.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云直播
云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档