通过使用这个tutorial,我能够创建一个简单的生产者-消费者示例。在我的示例中,只有一个主题,而我正在收听这个主题。因此,ReceiverConfig
中的代码是有意义的。特别是关于GROUP_ID_CONFIG
的一点,即我创建了主题topic_name
,然后在这个配置中进行了配置。现在我的问题是,如果我有一个以上的主题怎么办。假设我有topic_1
、topic_2
等等?我应该为每个单独的主题创建ReceiverConfig
吗?
@EnableKafka
@Configuration
public class ReceiverConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(GROUP_ID_CONFIG, "topic_name");
props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
发布于 2018-06-10 07:23:48
简短的答案是否定的,您不需要为每个主题创建多个配置。
在进一步讨论之前,我认为指定groupId
是消费者进程所属的组和消费者进程要消费的topic
是两码事。
通过下面的句子,您将告诉消费者它属于topic_name组,仅此而已。
props.put(GROUP_ID_CONFIG, "topic_name");
如果您希望使用者从多个主题读取数据,有一个方法可以将集合作为参数接收,这样您就可以指定所有主题来读取数据,而不必为每个主题创建新的配置。
请查看此示例,您将看到我提到的方法
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
https://stackoverflow.com/questions/50777283
复制相似问题