专栏首页IT技术精选文摘缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练

缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练

缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练

一、背景介绍:消息可靠传递的重要性

比如:某个广告主(如:天猫)想在我们的平台(如:今日头条)投放广告,当通过我们的广告系统新建广告的时候,该消息在同步给redis缓存(es)的时候丢失了,而我们又没有发现,造成该广告无法正常显示出来,那这损失就打了,如果1天都没有该广告的投放记录,那就有可能是上百万的损失了,所以消息的可靠传输多我们的广告系统也是很重要的。 其实,生活中这样的场景很场景,再比如:交易系统、订单系统都必须保证消息的可靠传输,否则,损失是巨大的!!!

二、如何保证消息的可靠传递呢?

1、设置交换机、队列和消息都为持久化

**持久化:**保证在服务器重启的时候可以保持不丢失相关信息,重点解决服务器的异常崩溃而导致的消息丢失问题。但是,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐率,在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。 处于某种应用场景,如:大流量的订单交易系统,为了不影响性能,我们可以不设置持久化,但是我们会定时扫描数据库中的未发送成功的消息,进行重试发送,实际应用场景,我们其实有很多解决方案,不要故步自封,换个角度多想想,只有经历多了,才能应用的更加得心应手。

1)交换机的持久化

@BeanDirectExchange advanceExchange() {    return new DirectExchange(exchangeName);
}

注释:查看源码,易知,默认是持久化的

2)队列的持久化

@Beanpublic Queue advanceQueue() {    return new Queue(queueName);
}

注释:查看源码,易知,默认是持久化的

3)消息的持久化

当我们使用RabbitTemplate调用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。默认就是持久化模式

注意:

  • 持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清楚。
  • 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。

2、生产者消息确认机制

当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了 为解决这个问题,主要有如下两种方案:

  • 通过事务机制实现
  • 通过生产者消息确认机制(publisher confirm)实现

但是使用事务机制实现会严重降低RabbitMQ的消息吞吐量,我们采用一种轻量级的方案——生产者消息确认机制

什么是消息确认机制? 简而言之,就是:生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地。 如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出。 再补充一个Mandatory参数:当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。

3、消费者消息确认机制

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。

4、死信队列

DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。 当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,rabbitmq会自动发送)。

什么是死信呢?什么样的消息会变成死信呢?

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

应用场景分析: 在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了 **如何使用死信交换机呢?

定义业务(普通)队列的时候指定参数:

  • x-dead-letter-exchange: 用来设置死信后发送的交换机
  • x-dead-letter-routing-key:用来设置死信的routingKey
@Beanpublic Queue helloQueue() {    //将普通队列绑定到私信交换机上
    Map<String, Object> args = new HashMap<>(2);
    args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
    args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
    Queue queue = new Queue(queueName, true, false, false, args);    return queue;
}

三、实战演练

项目代码下载地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git 项目名:spring-boot-rabbitmq-reliability

1、开启生产者消息确认机制

# 开启发送确认
spring.rabbitmq.publisher-confirms=true# 开启发送失败退回
spring.rabbitmq.publisher-returns=true

2、开启消费者消息确认机制

# 开启ACK
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3、基本配置

@Configurationpublic class RabbitConfig {    public final static String queueName = "hello_queue";    /**
     * 死信队列:
     */
    public final static String deadQueueName = "dead_queue";    public final static String deadRoutingKey = "dead_routing_key";    public final static String deadExchangeName = "dead_exchange";    /**
     * 死信队列 交换机标识符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";    /**
     * 死信队列交换机绑定键标识符
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";    @Bean
    public Queue helloQueue() {        //将普通队列绑定到私信交换机上
        Map<String, Object> args = new HashMap<>(2);
        args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
        args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
        Queue queue = new Queue(queueName, true, false, false, args);        return queue;
    }    /**
     * 死信队列:
     */

    @Bean
    public Queue deadQueue() {
        Queue queue = new Queue(deadQueueName, true);        return queue;
    }    @Bean
    public DirectExchange deadExchange() {        return new DirectExchange(deadExchangeName);
    }    @Bean
    public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {        return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
    }

}

注释:hell_queue就配置了死信交换机、死信队列

4、生产者核心代码

@Componentpublic class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{    @Autowired
    private RabbitTemplate rabbitTemplate;    public void send(String exchange, String routingKey) {
        String context = "你好现在是 " + new Date();
        System.out.println("send content = " + context);        this.rabbitTemplate.setMandatory(true);        this.rabbitTemplate.setConfirmCallback(this);        this.rabbitTemplate.setReturnCallback(this);        this.rabbitTemplate.convertAndSend(exchange, routingKey, context);
    }    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        if (!ack) {
            System.out.println("send ack fail, cause = " + cause);
        } else {
            System.out.println("send ack success");
        }
    }    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }
}

5、消费者核心代码

@Component@RabbitListener(queues = RabbitConfig.queueName)public class HelloReceiver {    @RabbitHandler
    public void process(String hello, Channel channel, Message message) throws IOException {        try {
            Thread.sleep(2000);
            System.out.println("睡眠2s");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        try {            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("receiver success = " + hello);
        } catch (Exception e) {
            e.printStackTrace();            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            System.out.println("receiver fail");
        }

    }

}

6、测试生产者消息确认功能:分为4种场景来测试

//1、exchange, queue 都正确, confirm被回调, ack=true@RequestMapping("/send1")@ResponseBodypublic String send1() {
    helloSender.send(null, RabbitConfig.queueName);    return "success";
}//2、exchange 错误, queue 正确, confirm被回调, ack=false@RequestMapping("/send2")@ResponseBodypublic String send2() {
    helloSender.send("fail-exchange", RabbitConfig.queueName);    return "success";
}//3、exchange 正确, queue 错误, confirm被回调, ack=true; return被回调 replyText:NO_ROUTE@RequestMapping("/send3")@ResponseBodypublic String send3() {
    helloSender.send(null, "fail-queue");    return "success";
}//4、exchange 错误, queue 错误, confirm被回调, ack=false@RequestMapping("/send4")@ResponseBodypublic String send4() {
    helloSender.send("fail-exchange", "fail-queue");    return "success";
}

7、测试消费者消息确认功能

  • 1)当添加这行代码的时候: channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 测试结果:消息被正常消费,消息从队列中删除
  • 2)当注释掉这行代码的时候: channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 测试结果:消息会被重复消费,一直保留在队列当中

8、测试死信队列

当执行这行代码的时候: channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 消息会被加入到死信队列中:

四、拓展

除了我们上面讲的基本可靠性保证外,其实还有很多性能优化方案、可靠性保证方案:集群监控、流控、镜像队列、HAProxy+Keeplived高可靠负载均衡

本文分享自微信公众号 - IT技术精选文摘(ITHK01)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-09-25

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Tomcat通过Redis实现session共享的完整部署记录

    对于生产环境有了一定规模的tomcat集群业务来说,要实现session会话共享,比较稳妥的方式就是使用数据库持久化session.为什么要持久化session...

    洗尽了浮华
  • 国内首例空号短信劫持案告破:与运营商“内鬼”勾结

    2018年8月17日,公安部公布9起打击整治网络乱象典型案例,排在第二位的是广西、湖南公安机关侦破的“长沙线尚网络科技有限公司”破坏计算机信息系统案。广西贵港警...

    周俊辉
  • 二次注入代码剖析

    本文针对二次注入进行讲解,并简单的绕过360脚本waf。。。。。 首先来看程序的注册页面代码:

    周俊辉
  • python爬虫系列之数据的存储(一):json库的使用

    在上一篇文章里我们讲了 xpath写法的问题还以爬取我的文章信息写了示例,但是在上一篇中我们只是爬取并打印了信息,并没有对信息进行保存。

    渔父歌
  • Linux运维跳槽必备的40道面试精华题

    过一次年,结婚、存款、父母养老,一系列向钱看的事都在碾压我们本来还挺简单的神经,但难过没有出路,唯有找到好的方法和事业方向,才能实现一步一个脚印的逆袭。

    小小科
  • Dataset 列表:机器学习研究

    In computer vision, face images have been used extensively to develop face recog...

    Petrichor_
  • 广东警方侦破黑客类案件100余宗 涉案金额2500余万元

    南都记者从广东省公安厅获悉,近日,在公安部的指挥协调下,省公安厅组织开展“净网安网”15号打击黑客网络攻击破坏违法犯罪专项行动。行动期间,共侦破黑客类案件100...

    周俊辉
  • Nginx(四)------nginx 负载均衡

      在上一篇博客我们介绍了 Nginx 一个很重要的功能——代理,包括正向代理和反向代理。这两个代理的核心区别是:正向代理代理的是客户端,而反向代理代理的是服务...

    IT可乐
  • 基于MongodbDB的用户认证-运维笔记

    MongoDB默认是不认证的,默认没有账号,只要能连接上服务就可以对数据库进行各种操作,MongoDB认为安全最好的方法就是在一个可信的环境中运行它,保证之后可...

    洗尽了浮华
  • 研究人员发现可公开访问的包含 1100 万条记录的 MongoDB 数据库

    安全研究员Bob Diachenko发现了一个可公开访问的MongoDB数据库,其中包含43.5 GB的数据和10.999.535的Yahoo电子邮件地址。除其...

    周俊辉

扫码关注云+社区

领取腾讯云代金券