在spring文档中,我们可以看到默认容器工厂可以使用kafkaListenerContainerFactory的bean名称,除非通过配置提供了显式的缺省值。
我想问一下,是否可以更改配置以使用我的自定义容器工厂bean (例如。customKafkaListenerContainerFactory)不是kafkaListenerContainerFactory?
如果我们键入->代码示例
@KafkaListener(id = "cat", topics = "myTopic")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}那么默认的containerFactory bean是customKafkaListenerContainerFactory而不是kafkaListenerContainerFactory
更准确地说,是 ->,如果我不提供任何containerFactory属性,则使用customKafkaListenerContainerFactory而不是kafkaListenerContainerFactory
发布于 2020-01-28 19:25:43
是的,可以通过在containerFactory注释中使用@KafkaListener属性,可以设置定制的kafka容器工厂bean。
用于创建负责服务此端点的消息侦听器容器的KafkaListenerContainerFactory的bean名称。
@KafkaListener(id = "cat", topics = "myTopic", containerFactory="customKafkaListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}也可以重写Config类中的默认kafkaListenerContainerFactory。正如@Gary所说的那样,它将使用相同的bean名来代替Boot,而Boot是以bean的存在为条件的
@Configuration
@EnableKafka
public class Config {
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory,consumerFactory());
// set custom properties
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}https://stackoverflow.com/questions/59955687
复制相似问题