有人知道如何为卡夫卡流配置spring.json.trusted.packages
吗?我使用以下配置:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
return new KafkaStreamsConfiguration(Map.of(
StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
));
}
我的application.yml如下:
spring:
kafka:
streams:
bootstrap-servers: XXX.XXX.XXX.XXX:9092
application-id: spring-kafka-test
properties:
processing.guarantee: exactly_once
spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
但是,当我将发布到Kafka中的实体的包更改时,我会收到以下异常:
Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 9 more
看起来添加到appplication.yml中的属性不起作用。
发布于 2022-03-28 09:54:59
此刻我找到了一个解决办法。这有点麻烦,但我没有发现任何更优雅的东西,至少现在起作用了。
我修改了我的applicationi.yml
,并使用以下配置添加了consumer
和producer
部分:
spring:
kafka:
consumer:
properties:
spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
producer:
properties:
spring.json.add.type.headers: false
streams:
bootstrap-servers: XXX.XXX.XXX.XXX:9092
application-id: spring-kafka-test
properties:
processing.guarantee: exactly_once
然后,我手动将配置好的属性添加到生产者和使用者:
@Bean
public JsonSerde<Entity> entityJsonSerde(
ObjectMapper objectMapper,
KafkaProperties kafkaProperties) {
JsonSerde<FlatTransaction> serde = new JsonSerde<>(Entity.class, objectMapper);
serde.deserializer().configure(kafkaProperties.buildConsumerProperties(), false);
serde.serializer().configure(kafkaProperties.buildProducerProperties(), false);
return serde;
}
现在,配置好的producer
和consumer
部分中的所有配置都应用于我的serde对象。异常已经消失,应用程序按预期工作。
https://stackoverflow.com/questions/71623576
复制相似问题