本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成。
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java
protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {
if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {
return createKafkaConsumer();
}
else {
Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);
modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);
return createKafkaConsumer(modifiedClientIdConfigs);
}
}
protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);
}
即KafkaListener注解标准的方法
)ListenerConsumer是重点,里头还有包括offset的提交,这里改天再详解一下。