在Spring Kafka中,JsonDeserializer
默认情况下会检查反序列化的类是否在 trusted.packages
列表中,以防止反序列化攻击。如果你想关闭这个检查,可以将 trusted.packages
设置为 *
,这意味着信任所有包。
以下是如何配置 JsonDeserializer
以关闭 trusted.packages
检查的示例:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, MyCustomObject> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); // 关闭trusted.packages检查
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyCustomObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomObject> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
如果你更喜欢使用配置文件来设置这些属性,可以在 application.properties
文件中进行配置:
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroupId
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
如果你使用的是 application.yml
文件,可以这样配置:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: myGroupId
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
deserializer:
value:
delegate:
class: org.springframework.kafka.support.serializer.JsonDeserializer
json:
trusted:
packages: "*"
通过以上配置,你可以关闭 JsonDeserializer
的 trusted.packages
检查,从而信任所有包。
领取专属 10元无门槛券
手把手带您无忧上云