假设配置同时应用于发送方和接收方:
@Configuration
@EnableRabbit
public class EventsConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
return new Jackson2JsonMessageConverter(objectMapper);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
final var rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
}
还假设接收端有一个简单的Spring侦听器,声明如下:
@RabbitListener(queues = "${amqp.inbox}")
public void listen(org.springframework.amqp.core.Message message) {
...
}
尝试接收由兔子模板发送的List<TaskAssignment>
集合(其中TaskAssignment
是一个具有两个UUID的简单POJO )的尝试在接收端结束,但有例外:
accounting-service_1 | 01:26:39.610 WARN [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1 | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1 | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1 | at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1 | at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1 | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1 | ... 6 common frames omitted
accounting-service_1 | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1 | at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1 | at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1 | at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1 | ... 11 common frames omitted
accounting-service_1 | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1 | at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1 | at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
杰克逊的Vavr模块安装在ObjectMapper
中,因此根本原因不是集合错误和不可序列化。
关于堆栈跟踪,值得注意的是头部分:
headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}
我的理论是:由于侦听器已经接收到JSON数组,而Jackson需要提供显式的TypeReference
来推断一个POJO,因此要对数组的项进行反序列化,这就是Jackson2MessageConverter
中出现失败的原因。没有人给它内容类型。
问题是--如何向Jackson2MessageConverter
提供这样的信息?RabbitTemplate
似乎没有给出任何方法。在发送方显式设置__ContentTypeId__
也于事无补。
或者,如何至少克服杰克逊在接收端反序列化的问题,并在@RabbitListener
中只听取带有byte[]
有效负载的原始消息?
UPD:制作了一个项目,在这里复制问题:https://www.dropbox.com/s/rde9u02mxob189w/proba_amqp.zip?dl=0
发布于 2021-11-15 14:39:37
您正在尝试接收未转换的原始Message
。
尝试listen(List<Foo> foos)
,这样我们就可以将推断的类型传递给转换器。
编辑
问题是您的集合对象与Jackson不兼容。
这行得通
@PostConstruct
public void init() {
List<POJO> payload = new ArrayList<>();
payload.add(new POJO(1,2));
payload.add(new POJO(3,4));
//POJO payload = new POJO(1,2);
template.convertAndSend(
outbox,
"",
payload,
message -> {
var props = message.getMessageProperties();
props.setMessageId(UUID.randomUUID().toString());
return message;
}
);
}
@RabbitListener(queues = "${amqp.inbox}")
public void listen(List<POJO> pojos) {
System.out.println("Message reached:" + pojos);
}
Message reached:[com.skapral.POJO@7af8fa38, com.skapral.POJO@5569d76c]
来自作者:这个答案给了我一个提示的根本原因。结果发现,vavr-jackson
- Jackson模块中存在一个用于支持Vavr集合的bug。
发布于 2021-11-11 16:03:52
看看这个样本是否对你有帮助:
https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json
文档在这里:https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter
更新
如果在转换器上使用此选项,请查看有什么不同:
/**
* When false (default), fall back to type id headers if the type (or contents of a container
* type) is abstract. Set to true if conversion should always be attempted - perhaps because
* a custom deserializer has been configured on the {@link ObjectMapper}. If the attempt fails,
* fall back to headers.
* @param alwaysAttemptConversion true to attempt.
* @since 2.2.8
*/
public void setAlwaysConvertToInferredType(boolean alwaysAttemptConversion) {
https://stackoverflow.com/questions/69922456
复制相似问题