前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ怎么保证可靠性

RabbitMQ怎么保证可靠性

作者头像
小王不头秃
发布2024-06-19 17:47:43
1920
发布2024-06-19 17:47:43
举报

前言

RabbitMQ相信大家都非常熟悉了,今天咱们来聊聊怎么保证RabbitMQ的可靠性。

那什么时候会出现问题呢?

第一种是生产端出现的问题。我们向队列中发送消息的时候,消息不一定可以发送到MQ中,这个时候如果我们不做任何处理,这样消息丢失了。

第二种则是RabbitMQ出现的问题。也就是说现在生产端的成功将消息发送到了RabbitMQ,但由于MQ并没有做持久化,这样宕机重启之后消息可能就丢失了。

第三种则是消费端的问题。消费端处理消息时如果出现异常,默认的解决方式是在重复消费多次,当次数超过阈值时直接删除消息,这也导致消息丢失。

接下来咱们就看看怎么应对以上三种问题。

生产端问题

解决方案

这里我们需要清楚发送的一个大体流程。

生产端发送消息到MQ之后,会收到一个结果,这个结果有acknack两种。

其中ack代表消息成功到达了交换机,但并不意味者消息到达了队列。不过ack的情况下消息未送达队列,会有相应的错误信息提醒。

nack就代表消息并未送达交换机

那么,怎么才能知道消息发送情况呢?

可以设置callback来获取消息发送结果。

代码

局部callback设置如下

代码语言:javascript
复制
    @GetMapping("testmq")
    public Result testmq(){
        String orderId = String.valueOf(UUID.randomUUID());
        String messageData = "下订单!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("orderId",orderId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);

//        设置发送的callback
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                log.info("发送成功");
            } else {
                log.error("消息未达到交换机,发送失败");
            }
        }, ex -> {
            log.error("出现异常,发送失败");
        });

        rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);
        return Result.succ("ok");
    }

验证

消息发送成功

交换机名称有误

队列路由出错

虽然没有错误,但给了我们warning。

RabbitMQ问题

这里就比较简单了,那就是做下持久化就可以了

首先是交换机,队列和消息的持久化

交换机

代码语言:javascript
复制
@Bean
    DirectExchange normalExchange() {
        /**
         * durable 是否持久化
         * autoDelete 没有queue绑定时是否自动删除
         */
        return new DirectExchange(NORMALEXCHANGE, true, false);
    }

队列

代码语言:javascript
复制
@Bean
    public Queue cleanDQueue() {
        return QueueBuilder.durable(CLEANQUEUE)
                .build();
    }

消息的持久化

代码语言:javascript
复制
rabbitTemplate.convertAndSend(RabbitMQConfig.NORMALEXCHANGE, RabbitMQConfig.TESTROUTING, map, message -> {
			// 设置消息持久化
           message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }, correlationData);

消费端问题

解决方案

消费端出现错误时,会进行重试,当重试次数超过阈值之后有三种解决方案,如下

  • RejectAndDontRequeueRecoverer:超过阈值,直接丢失消息。
  • ImmediateRequeueMessageRecoverer:超过阈值,返回nack,然后消息重新入队。
  • RepublishMessageRecoverer:超过阈值直接将消费失败的消息投递到指定交换机。

这里我们以RepublishMessageRecoverer为例做下演示。

代码

首先需要声明消费消息失败后传递的交换机和队列

代码语言:javascript
复制
   @Bean
    DirectExchange normalExchange() {
        /**
         * durable 是否持久化
         * autoDelete 没有queue绑定时是否自动删除
         */
        return new DirectExchange(NORMALEXCHANGE, true, false);
    }
//  用于处理消费失败消息的队列
    @Bean
    public Queue republishQueue() {
        return QueueBuilder.durable(REPULISHQUEUE)
                .build();
    }
//  绑定失败消费消息队列
    @Bean
    Binding bindingRepublish() {
        return BindingBuilder.bind(republishQueue()).to(normalExchange()).with(REPULISHROUTING);
    }

然后配置下RepublishMessageRecoverer策略,随便找个config注入下bean就可以。

代码语言:javascript
复制
 //  设置RepublishMessageRecoverer,消费失败的消息转移到另一队列中,交给管理员手动处理
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        /**
         * NORMALEXCHANGE   接收消费失败消息的交换机
         * REPULISHROUTING  接收消费失败消息的路由key
         */
        return new RepublishMessageRecoverer(rabbitTemplate, NORMALEXCHANGE, REPULISHROUTING);
    }

验证

咱们看看如果消费出错会咋样

我们可以看到被消费的队列中信息被删除了。

然后我们设置的转入队列中的消息数加一,这时候我们可以接收下该队列中的信息,存储到数据库中,方便维护人员手动进行处理。

总结

从生产端、RabbitMQ以及消费端三方面介绍了一下怎么保证RabbitMQ的可靠性,另外还有关于死信队列和延迟队列的内容在这篇博客中,大家有兴趣可以看一下。

RabbitMQ的死信队列和延迟队列

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-06-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 生产端问题
    • 解决方案
      • 代码
        • 验证
        • RabbitMQ问题
        • 消费端问题
          • 解决方案
            • 代码
              • 验证
              • 总结
              相关产品与服务
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档