专栏首页开发杂记RabbitMQ延迟消息学习

RabbitMQ延迟消息学习

准备做一个禁言自动解除的功能,立马想到了订单的超时自动解除,刚好最近在看RabbitMQ的实现,于是想用它实现,查询了相关文档发现确实可以实现,动手编写了这篇短文。

准备工作

1、Erlang安装请参考windows下安装Erlang 2、mq安装晴参考RabbitMQ安装 3、延迟消息插件安装rabbitmq_delayed_message_exchange

    #插件下载地址(选择与mq版本匹配的插件版本)
    http://www.rabbitmq.com/community-plugins.html
    #安装命令如下(在安装目录sbin下执行如下命令)
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

创建项目

我选择的是在springboot中集成RabbitMQ,配置相对简单很多。

项目创建好后,在application.properties中加入RabbitMQ参数:

#RabbitMQ config
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#Custom config
rabbitmq.exchange=test_exchange
rabbitmq.queue=test_queue_1

定义ConnectionFactory和RabbitTemplate

    package com.xsh.mq.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

Exchange和Queue配置

    package com.xsh.mq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置队列
 */
@Configuration
public class QueueConfig {

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    @Value("${rabbitmq.queue}")
    private String queueName;

    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
       //使用的是CustomExchange,不是DirectExchange,另外CustomExchange的类型必须是x-delayed-message
        return new CustomExchange(exchangeName, "x-delayed-message",true, false,args);
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue(queueName, true);
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange()).with(queueName).noargs();
    }
}

消息发送

    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceImpl {

    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageServiceImpl.class);
    /**
     * rabbitMQ模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.exchange}")
    private String exchangeName;

    /**
     * 发送消息
     * @param queueName 队列名称
     * @param msg 消息内容
     * @param delay 延迟时长 默认3秒
     */
    public void sendMsg(String queueName,String msg,Integer delay) {
        if(null == delay){
            delay = 3000;
        }
        logger.info("》》》》发送消息");
        Integer finalDelay = delay;
        rabbitTemplate.convertAndSend(exchangeName, queueName, msg, message -> {
            //必须添加header x-delay
            message.getMessageProperties().setHeader("x-delay", finalDelay);
            return message;
        });
    }
}

这里发送消息我定义了一个延迟参数,传入的延迟是多少,消息就延迟多少,方便消息延迟不一样

消费消息

    package com.xsh.mq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {
    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

    @RabbitListener(queues = "${rabbitmq.queue}")
    public void receive(String msg) {
        logger.info("收到消息:{}", msg);
    }
}

测试发送接收

先运行springboot项目,然后编写单元测试用例

      package com.xsh.mq;

  import com.xsh.mq.service.MessageServiceImpl;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.springframework.beans.factory.annotation.Autowired;
  import org.springframework.beans.factory.annotation.Value;
  import org.springframework.boot.test.context.SpringBootTest;
  import org.springframework.test.context.junit4.SpringRunner;

  @RunWith(SpringRunner.class)
  @SpringBootTest
  public class MqApplicationTests {

      @Test
      public void contextLoads() {
      }

      @Autowired
      private MessageServiceImpl messageService;

      @Value("${rabbitmq.queue}")
      private String queueName;

      @Test
      public void send() {
          messageService.sendMsg(queueName, "delayMsg2", 1000 * 60 * 2);
          messageService.sendMsg(queueName, "delayMsg1", 1000 * 60);
          messageService.sendMsg(queueName, "delayMsg3", 1000 * 60*3);
      }

  }

这里我发送了三条延迟消息,控制台结果如图:

消费者接收到的消息为:

从执行结果来看,demo基本实现,RabbitMQ其他细节还有待继续看。 参考文章:Scheduling Messages with RabbitMQ

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • java web项目中引入spring

    Step2:下载spring的jar包http://repo.spring.io/libs-release-local/org/springframework/...

    河岸飞流
  • js日期格式化

    河岸飞流
  • Array类型

    Array也是ECMAScript中常用类型之一,其特点是数组中的每一项都可以保存任何类型的数据,数组的大小可以动态调整。

    河岸飞流
  • 轻松上手SpringBoot Security + JWT Hello World示例

    在本教程中,我们将开发一个Spring Boot应用程序,该应用程序使用JWT身份验证来保护公开的REST API。在此示例中,我们将使用硬编码的用户和密码进行...

    sanshengshui
  • Redis整合Spring项目搭建实例

    本文介绍了如何使用注解的方式,将Redis缓存整合到你的Spring项目。 首先我们将使用jedis驱动,进而开始配置我们的Gradle。 group 'com...

    CSDN技术头条
  • Spring Boot MongoDB 实例

    节选自《Netkiller Java 手札》 11.12.3. Spring boot mongodb import org.springframework...

    netkiller old
  • 基于Springboot+jpa+thymeleaf+rabbit+SpringBoot mail 的简单项目

    Springboot+jpa+thymeleaf+rabbit 紧跟要求, 不需要使用Mybatis, 也不需要使用Eureka!!! 使用Rabbi...

    时间静止不是简史
  • Spring boot with Service

    本文节选自《Netkiller java 手札》 5.7. Service 5.7.1. Application @ComponentScan({ "web",...

    netkiller old
  • springboot使用rabbitMQ(带回调)

    配置文件2:RabbitConstants(主要用于用户名、密码等值从配置文件获取,也可以用@Value方式)

    小尘哥
  • Spring Cloud 入门教程2、服务消费者(Ribbon)

    Ribbon是Netflix开源的实现了负载均衡等功能的RPC客户端。 支持HTTP、TCP、UDP协议,且有一定的容错、缓存等机制。

    KenTalk

扫码关注云+社区

领取腾讯云代金券