首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >错误SerializationException - Spring Boot Kafka使用者

错误SerializationException - Spring Boot Kafka使用者
EN

Stack Overflow用户
提问于 2021-01-26 05:05:03
回答 2查看 3.3K关注 0票数 0

在执行我的消费者时,我遇到了下一个错误:

代码语言:javascript
运行
复制
2021-01-25 17:59:36.120 ERROR 1147 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124) [spring-kafka-2.6.5.jar!/:2.6.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_201]
        at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_201]
        at java.lang.Thread.run(Unknown Source) [na:1.8.0_201]
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition TestTopic at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 12, 0, 0, 16, 49, 50, 51, 52, 53, 54, 55, 56, 0, 8, 83, 66, 73, 70, 0, 2, 68, 0, -102, -103, -103, -103, -103, -103, -71, 63, 0, 20, 50, 48, 50, 48, 45, 48, 49, 45, 50, 48, 0, 20, 50, 48, 50, 48, 45, 48, 51, 45, 48, 49, 0, 20, 50, 48, 50, 48, 45, 48, 49, 45, 50, 48, 0, 2, 53]] from topic [TestTopic]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0xbff0010 (above 0x0010ffff) at char #1, byte #7)

我用avros文件创建了Topic_TEST类。

我的消费者配置:

代码语言:javascript
运行
复制
@Autowired
    PropertyConfig propertyConfig;
    
        
    private  final static String TRUSTSTORE_JKS = "truststore.jks"; 
    private  final static String SASL_PROTOCOL = "SASL_SSL"; 
    private  final static String SCRAM_SHA_256 = "SCRAM-SHA-256"; 
    private  final String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"; 
    private final String consJaasCfg = String.format(jaasTemplate, "test", "test123"); 

    private static final String TRUSTED_PACKAGE = "com.consumer.test.domain"; 
     
    
    @Bean
    public ConsumerFactory<String, Topic_Test> 
    DtoConsumerTest() 
    { 
          
        final Map<String, Object> props = new HashMap<>(); 
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propertyConfig.getBootstrapServer()); 
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propertyConfig.getGroupId()); 

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        
        if(propertyConfig.getFlag())
        {
        props.put(JsonDeserializer.TRUSTED_PACKAGES, TRUSTED_PACKAGE);  
        props.put("sasl.mechanism", SCRAM_SHA_256); 
        props.put("sasl.jaas.config", consJaasCfg); 
        props.put("security.protocol", SASL_PROTOCOL); 
        props.put("ssl.truststore.location", TRUSTSTORE_JKS); 
        props.put("ssl.truststore.password", propertyConfig.getPasswordTrustore()); 
        props.put("ssl.endpoint.identification.algorithm", ""); 


        props.put("schema.registry.url", "127.0.0.1:9092");
        }
        
        return new DefaultKafkaConsumerFactory<>( 
                props, new StringDeserializer(), 
                new JsonDeserializer<>(Topic_Test.class)); 
    } 
  
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, 
    Topic_Test> 
    TopicTestListener() 
    { 
        ConcurrentKafkaListenerContainerFactory<String, 
        Topic_Test> 
            factory 
            = new ConcurrentKafkaListenerContainerFactory<>(); 
        factory.setConsumerFactory(DtoConsumerTest()); 
        return factory; 
    } 

我用下一个配置的生产者发送消息:

代码语言:javascript
运行
复制
Properties properties = new Properties();
                // normal producer

                properties.setProperty("bootstrap.servers", "localhost:9092");
                properties.setProperty("acks", "all");
                properties.setProperty("retries", "10");
                properties.put(ProducerConfig.ACKS_CONFIG, "all");
                properties.put(ProducerConfig.CLIENT_ID_CONFIG, "TEST-GROUP");
                
            
                properties.put("sasl.mechanism", SCRAM_SHA_256);
                properties.put("sasl.jaas.config", consJaasCfg);
                properties.put("security.protocol", SASL_PROTOCOL);
                properties.put("ssl.truststore.location", TRUSTSTORE_JKS);
                properties.put("ssl.truststore.password", "test");
                properties.put("ssl.endpoint.identification.algorithm", "");
             
                properties.put(ProducerConfig.RETRIES_CONFIG, 2);  //increase to 10 from default of 0
               
                // avro part
                properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,  StringSerializer.class.getName());
                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

                properties.put("schema.registry.url", "http://127.0.0.1:9092");
             

                Producer<String, Topic_TEST> producer = new KafkaProducer<String, Topic_TEST>(properties);

我不知道到底是什么问题,我对kafka配置使用了相同的键和值。我需要一个有效的架构注册表?

编辑:我使用了错误处理逻辑(ErrorHandlingDeserializer):

代码语言:javascript
运行
复制
2021-01-26 17:15:56.344 ERROR 11876 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

java.lang.ArrayIndexOutOfBoundsException: 1
        at com.test.config.ConsumerConfigTest$1.handle(ConsumerConfigTest.java:109) ~[classes!/:0.0.1-SNAPSHOT]
        at com.test.config.ConsumerConfigTest$1.handle(ConsumerConfigTest.java:86) ~[classes!/:0.0.1-SNAPSHOT]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2102) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1997) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1924) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178) [spring-kafka-2.6.5.jar!/:2.6.5]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075) [spring-kafka-2.6.5.jar!/:2.6.5]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_201]
        at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_201]
        at java.lang.Thread.run(Unknown Source) [na:1.8.0_201]
EN

Stack Overflow用户

发布于 2021-01-26 09:31:47

您似乎正在使用JsonDeserializerKafkaAvroSerializer (后者不是有效的消费者配置值)

这些格式不兼容;您的消费者格式需要与您的生产者格式匹配。

所以试着这样做

代码语言:javascript
运行
复制
return new DefaultKafkaConsumerFactory<>(props);

修复后

代码语言:javascript
运行
复制
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class.getName());
票数 0
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65892401

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档