首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >我可以使用spring来处理反序列化异常吗?

我可以使用spring来处理反序列化异常吗?
EN

Stack Overflow用户
提问于 2021-09-04 12:59:02
回答 1查看 1.4K关注 0票数 0

我希望批处理侦听器在记录失败之前提交偏移量,记录失败的记录,然后从失败记录后的第一个偏移量开始检索新批处理。

我的当前方法处理侦听器代码中抛出的异常,方法是按我的意愿抛出一个由BatchListenerFailedException处理的RecoveringBatchErrorHandler。但是,我想以这种方式处理所有异常;也就是说,侦听器引发的异常和反序列化失败导致的任何异常。我用的是BatchMessagingMessageConverter。我知道,如果反序列化异常发生在Kafka反序列化器中,我可以使用ErrorHandlingDeserializer;但是,反序列化异常发生在MessagingMessageConverter中的配置中,我认为这是在Kafka客户端BytesDeserializer成功地反序列化了我的消息之后发生的。我怎样才能最好地实现我的目标?

下面是我的容器工厂配置:

代码语言:javascript
运行
复制
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
         ConsumerFactory<Object, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(
            new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, 2)
    );
    factory.setBatchErrorHandler(errorHandler);
    BatchMessagingMessageConverter messageConverter = new BatchMessagingMessageConverter(new BytesJsonMessageConverter());
    factory.setMessageConverter(messageConverter);
    factory.setConcurrency(1);
    return factory;
}

@Bean
public ConsumerFactory<Object, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "pojo-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, BytesDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return new DefaultKafkaConsumerFactory<>(props);
}

这是我的听众:

代码语言:javascript
运行
复制
@KafkaListener(id = "myKafkaListener", idIsGroup = false, autoStartup = "true", topics = {"pojo-topic"}, containerFactory = "kafkaListenerContainerFactory")
public void receive(List<Message<Pojo>> messages) {
    System.out.println("received " + messages.size() + " messages");
    int i = 0;
    try {
        //exceptions thrown here are handled as I intend
        for (var mm : messages) {
            var m = mm.getPayload();
            System.out.println("received: " + m + " at offset " + mm.getHeaders().get(KafkaHeaders.OFFSET, Long.class));
            i++;
        }
    } catch (Exception e) {
        throw new BatchListenerFailedException("listener threw exception when processing batch", e, i);
    }
}

更新

下面是当我发送字符串(只是"A")而不是json对象时的堆栈跟踪,反序列化失败:

代码语言:javascript
运行
复制
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:79) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.ContainerAwareBatchErrorHandler.handle(ContainerAwareBatchErrorHandler.java:56) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2015) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1859) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1725) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1704) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2376) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2008) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:1978) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1930) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1842) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 8 common frames omitted
Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:122) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:174) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:322) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:153) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1988) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'A': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"A"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762) ~[jackson-core-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586) ~[jackson-databind-2.12.4.jar:2.12.4]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3643) ~[jackson-databind-2.12.4.jar:2.12.4]
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:119) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 16 common frames omitted
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-08 16:10:52

以下是两种解决方案:第一种使用ErrorHandlingDeserializerJsonDeserializer。第二个是解决方案,并使用了我已经打开的ByteArrayJsonDeserializer 在批处理侦听器适配器中提供更无缝解决方案的问题

示例1,使用反序列化器:

代码语言:javascript
运行
复制
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.listener.type=batch
代码语言:javascript
运行
复制
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo == null
                    && headers.get(i).get(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {

                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT",
            properties = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG +
                ":org.apache.kafka.common.serialization.StringDeserializer")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    BatchErrorHandler eh(ProducerFactory<String, byte[]> pf) {
        KafkaTemplate<String, byte[]> template = new KafkaTemplate<>(pf,
                Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}");
            template.send("so69055510", "JUNK");
            template.send("so69055510", "{\"bar\":\"qux\"}");
        };
    }

}
代码语言:javascript
运行
复制
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]

示例2,使用自定义消息转换器。请注意,对于这方面的工作,您需要某种方式在域对象中指出反序列化失败:

代码语言:javascript
运行
复制
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
代码语言:javascript
运行
复制
@SpringBootApplication
public class So69055510Application {

    public static void main(String[] args) {
        SpringApplication.run(So69055510Application.class, args);
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so69055510").partitions(1).replicas(1).build();
    }

    @Bean
    NewTopic dlt() {
        return TopicBuilder.name("so69055510.DLT").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "so69055510", topics = "so69055510")
    void listen(List<Foo> in) {
        for (int i = 0; i < in.size(); i++) {
            Foo foo = in.get(i);
            if (foo.getBar().equals("thisIsABadOne")) {
                throw new BatchListenerFailedException("deserialization error",
                        new DeserializationException("Batch listener", null, false, null), i);
            }
            System.out.println(foo);
        }
    }

    @KafkaListener(id = "so69055510.DLT", topics = "so69055510.DLT")
    void listenDlt(String in) {
        System.out.println("DLT: " + in);
    }

    @Bean
    ByteArrayJsonMessageConverter converter() {
        return new ByteArrayJsonMessageConverter() {

            @Override
            public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer, Type type) {

                try {
                    return super.toMessage(record, acknowledgment, consumer, Foo.class); // <<<<<< type
                }
                catch (ConversionException ex) {
                    return MessageBuilder.withPayload(new Foo("thisIsABadOne"))
                            .build();
                }
            }

        };
    }

    @Bean
    BatchErrorHandler eh(KafkaTemplate<String, byte[]> template) {
        RecoveringBatchErrorHandler eh = new RecoveringBatchErrorHandler(new DeadLetterPublishingRecoverer(template));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, byte[]> template) {
        return args -> {
            template.send("so69055510", "{\"bar\":\"baz\"}".getBytes());
            template.send("so69055510", "JUNK".getBytes());
            template.send("so69055510", "{\"bar\":\"qux\"}".getBytes());
        };
    }

}
代码语言:javascript
运行
复制
Foo [bar=baz]
DLT: JUNK
Foo [bar=qux]
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69055510

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档