我最近开始使用spring webflux和Rabbitmq以及cassandra reactive存储库。我注意到的是,即使在cassandra中保存某些元素也无法成功保存该消息。我传播了在保存过程中抛出的异常,但即使消息是从队列中删除的。我想知道我应该怎么做才能让Rabbitmq知道这条消息应该被认为是失败的(我想拒绝消息以将其发送到死信队列)
@RabbitListener(queues = Constants.SOME_QUEUE, returnExceptions = "true")
public void receiveMessage(final List<ItemList> itemList) {
log.info("Received message from queue: {}", Constants.SOME_QUEUE);
itemService.saveAll(itemList)
.subscribe(
item-> log.info("Saving item with {}", item.getId()),
error -> {
log.error("Error during saving item", error);
throw new AmqpRejectAndDontRequeueException(error.getMessage());
},
() -> log.info(Constants.SOME_QUEUE+
" queue - {} items saved", itemList.size())
);
}
发布于 2018-05-28 14:51:19
Reactive是非阻塞的;一旦侦听器线程返回到容器,消息就会被确认。您需要以某种方式阻止侦听器线程(例如,使用Future<?>
),并在cassandra操作完成时唤醒它,如果成功,则正常退出,或者在失败时抛出异常,以便重新传递消息。
发布于 2018-05-29 07:58:29
我通过向rabbitmq发送明确的确认/拒绝消息解决了我的问题。这导致我不得不写更多的代码,但至少现在它可以工作了,我完全控制了发生的事情。
https://stackoverflow.com/questions/50566329
复制相似问题