首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何为Kafka + Spring配置`spring.json.trusted.packages`?

如何为Kafka + Spring配置`spring.json.trusted.packages`?
EN

Stack Overflow用户
提问于 2022-03-25 22:01:44
回答 1查看 420关注 0票数 0

有人知道如何为卡夫卡流配置spring.json.trusted.packages吗?我使用以下配置:

代码语言:javascript
运行
复制
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfig(KafkaProperties kafkaProperties) {
        return new KafkaStreamsConfiguration(Map.of(
                StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getStreams().getApplicationId(),
                StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getStreams().getBootstrapServers(),
                StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName(),
                StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()
        ));
    }

我的application.yml如下:

代码语言:javascript
运行
复制
spring:
  kafka:
    streams:
      bootstrap-servers: XXX.XXX.XXX.XXX:9092
      application-id: spring-kafka-test
      properties:
        processing.guarantee: exactly_once
        spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*

但是,当我将发布到Kafka中的实体的包更改时,我会收到以下异常:

代码语言:javascript
运行
复制
Caused by: java.lang.IllegalArgumentException: The class 'com.mypackage.streams.entity.kafka.raw.Entity' is not in the trusted packages: [java.util, java.lang, com.mypackage.streams.entity.kafka, com.mypackage.streams.entity.kafka.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:129)
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 9 more

看起来添加到appplication.yml中的属性不起作用。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-03-28 09:54:59

此刻我找到了一个解决办法。这有点麻烦,但我没有发现任何更优雅的东西,至少现在起作用了。

我修改了我的applicationi.yml,并使用以下配置添加了consumerproducer部分:

代码语言:javascript
运行
复制
spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: com.mypackage.streams.entity.kafka.*
    producer:
      properties:
        spring.json.add.type.headers: false
    streams:
      bootstrap-servers: XXX.XXX.XXX.XXX:9092
      application-id: spring-kafka-test
      properties:
        processing.guarantee: exactly_once

然后,我手动将配置好的属性添加到生产者和使用者:

代码语言:javascript
运行
复制
    @Bean
    public JsonSerde<Entity> entityJsonSerde(
            ObjectMapper objectMapper,
            KafkaProperties kafkaProperties) {

        JsonSerde<FlatTransaction> serde = new JsonSerde<>(Entity.class, objectMapper);
        serde.deserializer().configure(kafkaProperties.buildConsumerProperties(), false);
        serde.serializer().configure(kafkaProperties.buildProducerProperties(), false);
        return serde;
    }

现在,配置好的producerconsumer部分中的所有配置都应用于我的serde对象。异常已经消失,应用程序按预期工作。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71623576

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档