

在Apache Kafka中,确定哪个Consumer消费哪个Partition的数据是由Kafka的Group Coordinator和Partition Assignment策略来管理的。以下是一些关于这个过程的详细解释:
最终,每个Consumer会被分配一组Partition,它们将负责从这些Partition中消费数据。这个分配过程是动态的,因此如果Consumer Group的成员数目变化,或者有新的Partition被添加到Topic中,分配策略将会重新计算并分配Partition。
需要注意的是,Kafka的分配策略和Consumer Group的协调机制使得数据的消费和负载均衡变得相对容易管理,同时允许水平扩展,以适应不同规模的工作负载。

参数名称 | 描述 |
|---|---|
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3秒。必须小于session.timeout.ms,且不应该高于session.timeout.ms的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接的超时时间,默认45秒。超过该值,消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,消费者被移除,消费者组执行再平衡。 |
partition.assignment.strategy | 消费者分区分配策略,默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。可选策略包括:Range、RoundRobin、Sticky、CooperativeSticky。 |
Range分区分配策略是Kafka中一种用于分配分区给消费者的策略。它的基本原理是将一组连续的分区分配给每个消费者,这样每个消费者负责一定范围内的连续分区。这种分配方式有助于减少网络传输和提高局部性,因为相邻的分区通常在相同的Broker上,并且减少了Leader的切换。
以下是Range分区分配策略的详细原理和工作流程:

总的来说,Range分区分配策略通过将一组连续的分区分配给每个消费者来实现负载均衡。这有助于减少网络开销,并提高数据局部性,因为相邻的分区通常位于相同的Broker上,从而减少了Leader的切换次数,提高了整体性能。这是Kafka的默认分区分配策略之一,但你也可以选择其他策略来满足特定需求。
1)准备 ① 修改主题groupTest01的分区为7个分区
[xxx@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic groupTest01--partitions 7② 创建3个消费者,并组成消费者组group02,消费主题groupTest01 ③ 观察分区分配情况
启动第一个消费者,观察分区分配情况
// consumer01
Successfully synced group in generation Generation{generationId=1, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
启动第二个消费者,观察分区分配情况
// consumer01
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0
// consumer02
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group02-1-60afd984-6916-4101-8e72-ae52fa8ded6c', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5, groupTest01-4
启动第三个消费者,观察分区分配情况
// consumer01
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group02-1-4b019db6-948a-40d2-9d8d-8bbd79f59b14', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-1, groupTest01-0
// consumer02
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group02-1-60afd984-6916-4101-8e72-ae52fa8ded6c', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-4])
Adding newly assigned partitions: groupTest01-3, groupTest01-4
// consumer03
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group02-1-fd15f50f-0a33-4e3a-8b8c-237252a41f4d', protocol='range'}
Notifying assignor about the new Assignment(partitions=[groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5思考:如果有消费者退出,分区分配会是什么样子?
RoundRobin分区分配策略是Kafka中一种常用的消费者分区分配策略,它的原理非常简单:每个消费者依次轮流分配Topic的分区,以确保分区分配是均匀的。这意味着每个消费者将依次处理不同分区的数据,然后再重新开始。
以下是RoundRobin分区分配策略的详细原理:

RoundRobin分区分配策略的优点是简单且公平,每个消费者都有相同的机会消费每个分区。然而,它可能不适用于所有情况,特别是当分区的大小或活跃度不均匀时。在这种情况下,其他分区分配策略如Range或Sticky可能更适合,它们会更精细地考虑分区的特性来实现更好的负载均衡。
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());依次启动3个消费者,观察控制台输出
启动消费者CustomConsumer01,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=1, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
启动消费者CustomConsumer02,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-2, groupTest01-4, groupTest01-6])
Adding newly assigned partitions: groupTest01-2, groupTest01-0, groupTest01-6, groupTest01-4
// customConsumter02
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group03-1-5771a594-4e99-47e8-9df6-ca820af6698b', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1, groupTest01-3, groupTest01-5])
Adding newly assigned partitions: groupTest01-3, groupTest01-1, groupTest01-5
启动消费者CustomConsumer03,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group03-1-2d38c78b-b17d-4d43-93f5-4b676f703177', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-3, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-0, groupTest01-6
// customConsumter02
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group03-1-5771a594-4e99-47e8-9df6-ca820af6698b', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-1, groupTest01-4])
Adding newly assigned partitions: groupTest01-1, groupTest01-4
// customConsumter03
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group03-1-d493b011-d6ea-4c36-8ae5-3597db635219', protocol='roundrobin'}
Notifying assignor about the new Assignment(partitions=[groupTest01-2, groupTest01-5])
Adding newly assigned partitions: groupTest01-2, groupTest01-5思考:如果有消费者退出,分区分配会是什么样子?
可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化
"Sticky"分区分配策略是Kafka中的一种分区分配策略,用于将分区分配给消费者。它的主要目标是尽量减少分区再分配(rebalancing)的频率,以提高消费者组的稳定性。
Sticky分区分配策略的原理如下:
Sticky策略的主要优点是减少了分区再分配的频率,减轻了系统的不稳定性,降低了重新平衡的成本。这对于大规模的Kafka集群和高吞吐量的消费者组特别有用。然而,它不一定适用于所有情况,因为在某些情况下,需要更频繁的重新平衡以确保公平的负载分配。因此,在选择分区分配策略时,需要根据具体的使用情况和需求来权衡。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());启动消费者CustomConsumer01,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=1, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3, groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0, groupTest01-6, groupTest01-5, groupTest01-4
启动消费者CustomConsumer02,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2, groupTest01-3])
Adding newly assigned partitions: groupTest01-3, groupTest01-2, groupTest01-1, groupTest01-0
// customConsumer02
Successfully synced group in generation Generation{generationId=2, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-6])
Adding newly assigned partitions: groupTest01-6, groupTest01-5, groupTest01-4
启动消费者CustomConsumer03,观察控制台输出
// customConsumer01
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group06-1-19e1e6a4-e2ca-467d-909f-3769dc527d34', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-0, groupTest01-1, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-1, groupTest01-0
// customConsumer02
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5])
Adding newly assigned partitions: groupTest01-5, groupTest01-4
// customConsumer03
Successfully synced group in generation Generation{generationId=3, memberId='consumer-group06-1-eb0ac20c-5d94-43a7-b7c1-96a561513995', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-6])
Adding newly assigned partitions: groupTest01-3, groupTest01-6
3) 杀死消费者CustomConsumer01后,观察控制台输出
// customConsumer02
Successfully synced group in generation Generation{generationId=4, memberId='consumer-group06-1-6ea3622c-bbe8-4e13-8803-d5431a224671', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-4, groupTest01-5, groupTest01-0, groupTest01-2])
Adding newly assigned partitions: groupTest01-2, groupTest01-0, groupTest01-5, groupTest01-4
// customConsumer03
Successfully synced group in generation Generation{generationId=4, memberId='consumer-group06-1-eb0ac20c-5d94-43a7-b7c1-96a561513995', protocol='sticky'}
Notifying assignor about the new Assignment(partitions=[groupTest01-3, groupTest01-6, groupTest01-1])
Adding newly assigned partitions: groupTest01-3, groupTest01-1, groupTest01-6