前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式--RabbitMQ集成SpringBoot、消息可靠性、死信队列、延迟交换机、集群

分布式--RabbitMQ集成SpringBoot、消息可靠性、死信队列、延迟交换机、集群

作者头像
aruba
发布2022-09-19 15:39:04
5000
发布2022-09-19 15:39:04
举报
文章被收录于专栏:android技术android技术

接着上篇分布式--RabbitMQ入门

一、SpringBoot中使用RabbitMQ

1. 导入依赖
代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
2. yml配置
代码语言:javascript
复制
spring:
  rabbitmq:
    host: 192.168.42.4
    port: 5672
    username: aruba
    password: aruba
    virtual-host: /
    listener:
      direct:
        acknowledge-mode: manual # 手动ack
      simple:
        prefetch: 1 # 流控
        concurrency: 10 # 多线程监控
3. 配置交换机和队列
代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "MY-MQ-EX";
    public static final String QUEUE_NAME = "MY-MQ-QUEUE";
    public static final String ROUTING_KEY = "key.#";

    /**
     * 注入交换机
     *
     * @return
     */
    @Bean
    public Exchange exchangeProvider() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }

    /**
     * 注入队列
     *
     * @return
     */
    @Bean
    public Queue queueProvider() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    /**
     * 注入交换机队列绑定关系
     *
     * @return
     */
    @Bean
    public Binding bootBinding(Exchange exchangeProvider, Queue queueProvider) {
        return BindingBuilder.bind(queueProvider).to(exchangeProvider).with(ROUTING_KEY).noargs();
    }
}
4. 发送消息

SpringBoot中使用RabbitTemplate自动注入,即可发送消息,并对方法都进行了封装

代码语言:javascript
复制
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Test
    void send() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "key.send", "发送消息");
    }

    /**
     * 携带信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }
    
}
5. 订阅消息

在方法上使用@RabbitListener注解,即可指定订阅队列。

入参添加Channel,就可以和之前一样发送ack

将消息封装成了Message,可以获取其携带信息。

代码语言:javascript
复制
@Component
public class MQListener {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consume(String msg, Channel channel, Message message) throws IOException {
        System.out.println("队列的消息为:" + msg);
        String correlationId = message.getMessageProperties().getCorrelationId();
        System.out.println("唯一标识为:" + correlationId);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

运行结果:

二、消息可靠性

由于RabbitMQ在发送消息和订阅消息时,都是通过网络传输,其间必然会出现由网络问题产生的消息丢失情况,要保证消息的可靠性从下面四点出发:

  • 保证消息发送到交换机
  • 保证消息路由到队列
  • 保证队列中消息的持久化
  • 保证消费者正常消费消息
1. Client-API方式
1.1 保证消息发送到交换机

Publisher Confirms就是为了保证消息发送到交换机的机制,一般使用异步的方式:

代码语言:javascript
复制
        //4. 开启confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有送达交换机");
            }
        });
1.2 保证消息路由到队列

addReturnListener方法可以确认消息是否路由到了队列,如果回调了说明没有路由到队列

发送消息时,指定mandatory参数为true

代码语言:javascript
复制
        //5. 设置return回调,确认消息是否路由到了队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交换机没有路由到队列");
            }
        });

        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
1.3 保证队列中消息的持久化

首先保证队列的持久化,再保证消息的持久化

代码语言:javascript
复制
        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //6. 发送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
1.4 保证消费者正常消费消息

保证消费者正常消费消息只需要手动ack即可,生产者完整代码:

代码语言:javascript
复制
public class Publisher {

    private static final String QUEUE_NAME = "confirm";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4. 开启confirm
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息成功发送到交换机");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("没有送达交换机");
            }
        });

        //5. 设置return回调,确认消息是否路由到了队列
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("交换机没有路由到队列");
            }
        });

        //6. 发送消息
        String message = "hello confirm";
        AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
                .deliveryMode(2) //2:消息持久化 1: 不持久化
                .build();
        //参数: 交换机 routing-Key mandatory 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
    }

}
2. SpringBoot方式
2.1 配置Confirm

yml中开启confirm

代码语言:javascript
复制
spring:
  rabbitmq:
    publisher-confirm-type: correlated

RabbitTemplate设置回调:

代码语言:javascript
复制
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送达到交换机");
                } else {
                    System.out.println("消息没有送达到交换机");
                }
            }
        });
2.2 配置Return

yml中开启return

代码语言:javascript
复制
spring:
  rabbitmq:
    publisher-returns: true

RabbitTemplate设置回调:

代码语言:javascript
复制
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
            }
        });
2.3 消息持久化

设置Message的携带信息:

代码语言:javascript
复制
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    }
                });

完整代码:

代码语言:javascript
复制
    /**
     * 携带信息的消息
     */
    @Test
    void sendWithProps() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息成功送达到交换机");
                } else {
                    System.out.println("消息没有送达到交换机");
                }
            }
        });
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println(String.format("交换机:%s 路由消息失败", returned.getExchange()));
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
                "key.send", "发送消息", new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                        return message;
                    }
                });
    }

三、死信队列

死信队列是存放本来应该死亡的消息的队列,用于对这些消息的特殊处理(如:重新入队、持久化到数据库),具体有以下几种消息会被存放进死信队列:

  • 消费者拒绝的消息,并requeue设置为false(不重新入队列)
  • 消息的生存时间到了,还在队列中的信息
  • 队列设置了整体的消息生存时间,到了生存时间的消息
  • 到达队列中消息最大数,再路由过来的消息
1. 构建交换机

死信队列需要一个死信交换机,并把正常消息的队列绑定死信交换机:

代码语言:javascript
复制
@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE_NAME = "normal-ex";
    public static final String NORMAL_QUEUE_NAME = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE_NAME = "dead-ex";
    public static final String DEAD_QUEUE_NAME = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";

    @Bean
    public Exchange normalExchange() {
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") //准备入死信队列的消息重新设置routin-key
                .build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange() {
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
    }

    @Bean
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }

}
2. 死信队列的实现方式
2.1 拒绝消息入死信队列

对正常队列消息进行监听,来做相应的处理,首先是拒绝消息,并且要把requeue设为false

代码语言:javascript
复制
@Component
public class DeadListener {

    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE_NAME)
    public void normalListener(Message msg, Channel channel) throws IOException {
        System.out.println("接收到正常队列消息:" + new String(msg.getBody()));
        channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
//        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
    }

}

尝试发送一个消息:

代码语言:javascript
复制
    @Test
    public void sendNormal() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME, "normal.msg", "哈喽");
    }

运行结果:

2.2 消息生存时间

发送消息时,通过消息的额外参数MessagePropertiessetExpiration方法设置过期时间:

代码语言:javascript
复制
    @Test
    public void sendExpire() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        // 该消息10s后过期
                        message.getMessageProperties().setExpiration("10000");
                        return message;
                    }
                });
    }

记得把上面消息的监听注释掉,否则会消费消息

运行结果:

2.3 队列消息的整体生存时间

管理页面把之前的正常队列删除,在重新创建时,为正常队列设置ttl

设置ttl

代码语言:javascript
复制
    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
                .ttl(5000) // 整体消息过期时间为5s
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送正常消息,运行结果:

2.4 达到队列最大数

同样先删除正常队列,后调用maxLength为队列设置最大消息数:

代码语言:javascript
复制
    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整体消息过期时间为5s
                .maxLength(1) // 设置消息最大数
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送两次正常消息,运行结果:

四、延迟交换机

死信队列的问题:由于死信队列只会监听队列头的过期时间,一旦队列头的消息过期时间比后面排队的消息过期时间长,那么后面消息的过期时间并不会生效,而是等待队列头的过期时间到了后,才一并进入死信队列

删除正常队列,恢复配置:

代码语言:javascript
复制
    @Bean
    public Queue normalQueue() {
        // 绑定死信交换机
        return QueueBuilder.durable(NORMAL_QUEUE_NAME)
//                .ttl(5000) // 整体消息过期时间为5s
//                .maxLength(1) // 设置消息最大数
                .deadLetterExchange(DEAD_EXCHANGE_NAME)
                .deadLetterRoutingKey("dead.msg") // 准备入死信队列的消息重新设置routin-key
                .build();
    }

发送两次消息,第一次过期时间为30s,第二次为2s:

代码语言:javascript
复制
    @Test
    public void sendExpire30() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("30000");
                        return message;
                    }
                });
    }

    @Test
    public void sendExpire2() {
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
                "normal.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setExpiration("2000");
                        return message;
                    }
                });
    }

结果,过了几秒后,队列中还是两个消息:

解决方法:根据时间创建多个队列或者使用延迟交换机

延迟交换机是一个插件,默认并不带,原理就是将消息暂时放在交换机中,由交换机根据消息过期时间的先后来路由到队列,缺点:由于消息在交换机中,重启会导致消息的丢失

1. 插件下载和使用

根据自己的RabbitMQ版本进行下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/

代码语言:javascript
复制
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
mv rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/local/rabbitmq/rabbitmq_server-3.8.35/plugins

启动插件:

代码语言:javascript
复制
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启服务或系统后,多了一个x-delayed-message的交换机类型:

2. 配置延迟交换机

使用CustomExchange构造x-delayed-message类型交换机,并使用其他参数x-delayed-type指定使用哪种原型交换机类型,这边使用的是topic

代码语言:javascript
复制
@Configuration
public class DelayExchangeConfig {

    public static final String EXCHANGE_NAME = "delay-exchange";
    public static final String DELAY_QUEUE = "delay_queue";
    public static final String DELAY_ROUTIN_KEY = "delay.#";

    @Bean
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        // 使用哪种原型交换机类型
        args.put("x-delayed-type", "topic");
        Exchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
        return exchange;
    }

    @Bean
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }


    @Bean
    public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTIN_KEY).noargs();
    }
    
}
3. 发送消息

MessageProperties使用setDelay方法为消息设置延迟:

代码语言:javascript
复制
    @Test
    public void sendDelay30() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(30000);
                        return message;
                    }
                });
    }

    @Test
    public void sendDelay5() {
        rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
                "delay.msg", "哈喽",
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(5000);
                        return message;
                    }
                });
    }

消息在交换机进行等待后,首先入队列的为5秒延迟的,后面入队列的为30秒延迟的:

五、集群

1. 配置主机名

RabbitMQ集群的搭建要配置主机名:HOSTNAME,先修改network配置文件

代码语言:javascript
复制
vi /etc/sysconfig/network

追加HOSTNAME:

代码语言:javascript
复制
HOSTNAME=rabbit1

再修改hosts文件:

代码语言:javascript
复制
vi /etc/hosts

追加内容:

代码语言:javascript
复制
192.168.42.4 rabbit1

重启系统后,RabbitMQ先前配置的管理账号会丢失,需要重新配置

2. 克隆虚拟机
2.1 从机主机名配置

克隆后,对从机进行主机名的配置,network配置文件:

hosts文件,中需要添加集群主节点的ip和hostname:

2.2 建立集群关联

启动RabbitMQ服务后,管理界面的节点会带上主机名:

接下来,配置从机加入到主节点集群中,执行以下命令即可:

代码语言:javascript
复制
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin/ 
./rabbitmqctl stop_app
./rabbitmqctl reset 
./rabbitmqctl join_cluster rabbit@rabbit1
./rabbitmqctl start_app

加入成功后,管理界面中就会出现多个节点:

3. 配置镜像模式

目前集群是普通模式,队列中的消息只会存在于一个节点上,而不会同步到其他队列,一旦该节点宕机,其他节点将无法访问消息。

镜像模式是指,集群中所有节点都有一份单独的拷贝,即使单一节点宕机,其他节点中依然存在消息的拷贝,这样才能实现高可用

在管理界面进行配置镜像策略:

新建一个队列,并查看详情:

项目地址:

https://gitee.com/aruba/rabbit-mqstudy.git

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-07-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、SpringBoot中使用RabbitMQ
    • 1. 导入依赖
      • 2. yml配置
        • 3. 配置交换机和队列
          • 4. 发送消息
            • 5. 订阅消息
            • 二、消息可靠性
              • 1. Client-API方式
                • 1.1 保证消息发送到交换机
                • 1.2 保证消息路由到队列
                • 1.3 保证队列中消息的持久化
                • 1.4 保证消费者正常消费消息
              • 2. SpringBoot方式
                • 2.1 配置Confirm
                • 2.2 配置Return
                • 2.3 消息持久化
            • 三、死信队列
              • 1. 构建交换机
                • 2. 死信队列的实现方式
                  • 2.1 拒绝消息入死信队列
                  • 2.2 消息生存时间
                  • 2.3 队列消息的整体生存时间
                  • 2.4 达到队列最大数
              • 四、延迟交换机
                • 1. 插件下载和使用
                  • 2. 配置延迟交换机
                    • 3. 发送消息
                    • 五、集群
                      • 1. 配置主机名
                        • 2. 克隆虚拟机
                          • 2.1 从机主机名配置
                          • 2.2 建立集群关联
                        • 3. 配置镜像模式
                          • 项目地址:
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档