首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在Spring中通过RabbitMQ发送接收收集

在Spring中通过RabbitMQ发送接收收集
EN

Stack Overflow用户
提问于 2021-11-11 02:15:21
回答 2查看 643关注 0票数 1

假设配置同时应用于发送方和接收方:

代码语言:javascript
运行
复制
@Configuration
@EnableRabbit
public class EventsConfig {
    @Bean
    public Jackson2JsonMessageConverter messageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) {
        final var rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
        return rabbitTemplate;
    }
}

还假设接收端有一个简单的Spring侦听器,声明如下:

代码语言:javascript
运行
复制
    @RabbitListener(queues = "${amqp.inbox}")
    public void listen(org.springframework.amqp.core.Message message) {
        ...
    }

尝试接收由兔子模板发送的List<TaskAssignment>集合(其中TaskAssignment是一个具有两个UUID的简单POJO )的尝试在接收端结束,但有例外:

代码语言:javascript
运行
复制
accounting-service_1  | 01:26:39.610 WARN  [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] - Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}, messageId=83f6bb30-a2b8-434a-867e-08684241e9b0, type=TASKS_REASSIGNED, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=outbox, receivedRoutingKey=, deliveryTag=3, consumerTag=amq.ctag-PodhhDJx6JtYsjIg6Cof1Q, consumerQueue=accounting-service])
accounting-service_1  | 01:26:39.610 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] - Execution of Rabbit message listener failed, and the error handler threw an exception
accounting-service_1  | org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1460)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1744)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1519)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
accounting-service_1  |     at java.base/java.lang.Thread.run(Unknown Source)
accounting-service_1  | Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
accounting-service_1  |     ... 6 common frames omitted
accounting-service_1  | Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
accounting-service_1  |     at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:344)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:347)
accounting-service_1  |     at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
accounting-service_1  |     at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:136)
accounting-service_1  |     ... 11 common frames omitted
accounting-service_1  | Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `byte` from Object value (token `JsonToken.START_OBJECT`)
accounting-service_1  |  at [Source: (String)"[{"assigneeId":"1033c1ab-f277-4162-a711-722966bf69ec","taskId":"ae408e2d-39e0-4094-bf52-8c171d965be7"}]"; line: 1, column: 2] (through reference chain: byte[][0])
accounting-service_1  |     at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)

杰克逊的Vavr模块安装在ObjectMapper中,因此根本原因不是集合错误和不可序列化。

关于堆栈跟踪,值得注意的是头部分:

代码语言:javascript
运行
复制
headers={__ContentTypeId__=java.lang.Object, __TypeId__=io.vavr.collection.List$Cons}

我的理论是:由于侦听器已经接收到JSON数组,而Jackson需要提供显式的TypeReference来推断一个POJO,因此要对数组的项进行反序列化,这就是Jackson2MessageConverter中出现失败的原因。没有人给它内容类型。

问题是--如何向Jackson2MessageConverter提供这样的信息?RabbitTemplate似乎没有给出任何方法。在发送方显式设置__ContentTypeId__也于事无补。

或者,如何至少克服杰克逊在接收端反序列化的问题,并在@RabbitListener中只听取带有byte[]有效负载的原始消息?

UPD:制作了一个项目,在这里复制问题:https://www.dropbox.com/s/rde9u02mxob189w/proba_amqp.zip?dl=0

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-11-15 14:39:37

您正在尝试接收未转换的原始Message

尝试listen(List<Foo> foos),这样我们就可以将推断的类型传递给转换器。

编辑

问题是您的集合对象与Jackson不兼容。

这行得通

代码语言:javascript
运行
复制
    @PostConstruct
    public void init() {
        List<POJO> payload = new ArrayList<>();
        payload.add(new POJO(1,2));
        payload.add(new POJO(3,4));
        //POJO payload = new POJO(1,2);
        template.convertAndSend(
            outbox,
            "",
            payload,
            message -> {
                var props = message.getMessageProperties();
                props.setMessageId(UUID.randomUUID().toString());
                return message;
            }
        );
    }

    @RabbitListener(queues = "${amqp.inbox}")
    public void listen(List<POJO> pojos) {
        System.out.println("Message reached:" + pojos);
    }
代码语言:javascript
运行
复制
Message reached:[com.skapral.POJO@7af8fa38, com.skapral.POJO@5569d76c]

来自作者:这个答案给了我一个提示的根本原因。结果发现,vavr-jackson - Jackson模块中存在一个用于支持Vavr集合的bug。

https://github.com/vavr-io/vavr-jackson/issues/189

票数 1
EN

Stack Overflow用户

发布于 2021-11-11 16:03:52

看看这个样本是否对你有帮助:

https://github.com/spring-projects/spring-amqp-samples/tree/main/spring-rabbit-json

文档在这里:https://docs.spring.io/spring-amqp/docs/current/reference/html/#json-message-converter

更新

如果在转换器上使用此选项,请查看有什么不同:

代码语言:javascript
运行
复制
/**
 * When false (default), fall back to type id headers if the type (or contents of a container
 * type) is abstract. Set to true if conversion should always be attempted - perhaps because
 * a custom deserializer has been configured on the {@link ObjectMapper}. If the attempt fails,
 * fall back to headers.
 * @param alwaysAttemptConversion true to attempt.
 * @since 2.2.8
 */
public void setAlwaysConvertToInferredType(boolean alwaysAttemptConversion) {
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69922456

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档