嗨,我有一个卡夫卡消费者(使用春季卡夫卡依赖),听多个主题。假设我有三个主题,分别是topicA、topicB和topicC。在我的应用程序中,我在一个使用者中使用了所有三个主题,如下所示。
@KafkaListener(topics = "topicA,topicB,topicC", groupId = "myGroup", concurrency="3")
我的主题有分区,而这些分区的数量与每个分区是不同的。假设我的topicA有3个分区。topicB有6个分区,topicC有9个分区。如何在@KafkaListener
中确定“并发”选项的数字。(我很困惑,因为topicB和topicC分别包含6个和9个分区。那么,应该将并发更改为6或9吗?或者我是否应该将其更改为18,这是来自3个主题的分区总数)
我知道,在消费者端,Kafka总是将单个分区的数据提供给一个使用者线程,并且使用者(在消费者组中)中的并行度被正在使用的分区数量所限制。我的主要目标是通过在@kafkalistener
中使用并发选项来并行消费。
发布于 2022-09-14 12:57:16
如果使用默认分区转让人将并发设置为18,如果并发性大于分区数,则将有空闲的使用者。来自不同主题的分区与分区的分布方式无关。
您可以使用自定义分区转让人(在使用者配置中)以不同方式分配分区。
请参阅https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy
还请参阅关于RoundRobinAssignor
的讨论,https://docs.spring.io/spring-kafka/docs/current/reference/html/#using-ConcurrentMessageListenerContainer
或者,只需在方法中添加3个单独的@KafkaListener
注释,每个主题一个,具有不同的并发。
https://stackoverflow.com/questions/73713298
复制相似问题