首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用Spring提供的Kafka apis在一个消费组中创建多个消费者

如何使用Spring提供的Kafka apis在一个消费组中创建多个消费者
EN

Stack Overflow用户
提问于 2020-07-01 17:17:33
回答 3查看 1.2K关注 0票数 2

我试图在一个消费者组中创建多个消费者,以便进行并行处理,因为我们有大量的消息流入。我使用的是spring boot和KafkTemplate。如何在spring boot应用程序的单个实例中创建属于单个消费者组的多个消费者?使用@KafkaListener注释多个方法会创建多个消费者吗?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-07-01 19:06:10

您必须使用ConcurrentMessageListenerContainer。它委托一个或多个KafkaMessageListenerContainer实例来提供多线程消费。

代码语言:javascript
运行
复制
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

factory.setConcurrency( 10 )创建10个KafkaMessageListenerContainer实例。每个实例都会获得一定数量的分区。这取决于您在创建topic时配置的分区数量。

一些准备步骤:

代码语言:javascript
运行
复制
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

private final static String BOOTSTRAP_ADDRESS = "localhost:9092";
private final static String CONSUMER_GROUP = "consumer-group-1";
private final static String TOPIC = "test-topic";

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}


@KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message) {
    logger.info(message);
}

public void start() {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    
    for (int i = 0; i < 10; i++) {
        kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i);
    }
    
    logger.info("All message are sent");
}

如果运行上面的方法,您可以看到每个KafkaMessageListenerContainer实例都在处理放入该实例所服务的分区中的消息。添加Thread.sleep()以等待消费者被初始化。

代码语言:javascript
运行
复制
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo                 : Message 5
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo                 : Message 7
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo                 : Message 8
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo                 : Message 1
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo                 : Message 0
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo                 : Message 9
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo                 : Message 4
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo                 : Message 3
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo                 : Message 2
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo                 : Message 6
票数 2
EN

Stack Overflow用户

发布于 2020-07-01 21:32:42

是的,@KafkaListener将为您创建多个消费者。

这样,您就可以将它们全部配置为使用相同的主题并属于相同的组。Kafka协调器将把分区分发给你的消费者。

不过,如果在本主题中只有一个分区,则不会发生并发:单个分区在单个线程中处理。

另一种选择实际上是配置一个concurrency,并且将根据concurrency <-> partition状态再次创建几个使用者。

票数 2
EN

Stack Overflow用户

发布于 2020-07-06 14:11:08

正如@Salavat Yalalo建议的那样,我将我的卡夫卡容器工厂设为ConcurrentKafkaListenerContainerFactory。在@KafkaListenere方法中,我添加了一个名为concurrency的选项,它接受一个整数作为字符串,指示要跨越的消费者数量,如下所示

代码语言:javascript
运行
复制
@KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values)
public void topicConsumer(Message<MyObject> myObject){
//.....
}

当运行时,我看到在一个消费者组中创建了4个消费者。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62673586

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档