前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitMq实现延时队列

rabbitMq实现延时队列

作者头像
chinotan
发布2022-01-04 16:24:03
1.4K0
发布2022-01-04 16:24:03
举报

rabbitMq是受欢迎的消息中间件之一,相比其他的消息中间件,具有高并发的特性(天生具备高并发高可用的erlang语言编写),除此之外,还可以持久化,保证消息不易丢失,高可用,实现集群部署,提供灵活的路由和可靠性,可视化管理等等的优点。

相比于其他的消息队列,rabbitmq最大的特色就是加入了exchange(交换器)这个东西,AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

direct:把消息投递到那些binding key与routing key完全匹配的队列中。

topic:将消息路由到binding key与routing key模式匹配的队列中。

言归正传,延时队列如何通过rabbitmq来实现呢?

分析:首先rabbitmq自己是不具备延时的功能的,除了使用官方提供的插件之外,我们还可以通过ttl(设置超时时间的方式)+ DLX(一个死信队列)的方式来实现 + Router(转发队列)

其中,ttl可以设置在消息上,也可以设置在队列上,设置在消息上可以提供更大的灵活性,但是如果同时设置超时时间的话,就取最小的超时时间为准。

此外,死信队列是一个普通的队列,它没有消费者,用来存储有超时时间信息的消息,并且可以设置当消息超时(ttl),转发到另一个指定队列(此处设置转发到router, 当发送消息之后(发送时,带上要延时的队列名称),等待消息超时,将消息转发到指定的Router队列。

最后,转发队列,用来接收死信队列超时消息,在接收到之后,消费者将消息解析,获取queueName,body,再向所获取的queueName队列发送一条消息,内容为body.

下面是代码:

生产者:

package cn.chinotan.service.delayQueueRabbitMQ;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @program: test
 * @description: 生产者
 * @author: xingcheng
 * @create: 2018-08-12 12:33
 **/
@Service
public class Producr {
    private static final Logger LOGGER = LoggerFactory.getLogger(Producr.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(String msg, long time, String delayQueueName) {
        //rabbit默认为毫秒级
        long times = time * 1000;
        MessagePostProcessor processor = new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(String.valueOf(times));
                return message;
            }
        };
        // 拼装msg
        msg = StringUtils.join(msg, ":", delayQueueName);
        amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, MqConstant.DEAD_LETTER_QUEUE, msg, processor);
    }
}

消费队列1:

package cn.chinotan.service.delayQueueRabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @program: test
 * @description: queueOne消费者
 * @author: xingcheng
 * @create: 2018-08-12 12:35
 **/
@Service
public class MyQueueOneConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueOneConsumer.class);

    @RabbitListener(queues=MqConstant.MY_QUEUE_ONE)
    @RabbitHandler
    public void process(String content) {
        LOGGER.info("延迟时间到,queueOne开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }
}

消费队列2:

package cn.chinotan.service.delayQueueRabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @program: test
 * @description: queueTwo队列
 * @author: xingcheng
 * @create: 2018-08-12 12:35
 **/
@Service
public class MyQueueTwoConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueTwoConsumer.class);
    @Autowired
    private Producr producr;

    @RabbitListener(queues=MqConstant.MY_QUEUE_TWO)
    @RabbitHandler
    public void process(String content) {
        LOGGER.info("延迟时间到,queueTwo开始执行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }
}

转发队列:

package cn.chinotan.service.delayQueueRabbitMQ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @program: test
 * @description: 转发队列
 * @author: xingcheng
 * @create: 2018-08-12 12:35
 **/
@Service
public class TradeProcess {
    private static final Logger LOGGER = LoggerFactory.getLogger(TradeProcess.class);

    @Autowired
    private AmqpTemplate amqpTemplate;
    
    @RabbitListener(queues=MqConstant.MY_TRANS_QUEUE)
    @RabbitHandler
    public void process(String content) {
        String msg = content.split(":")[0];
        String delayQueueName = content.split(":")[1];
        amqpTemplate.convertAndSend(MqConstant.MY_EXCHANGE, delayQueueName, msg);
        LOGGER.info("进行转发 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }
}

队列配置:

package cn.chinotan.service.delayQueueRabbitMQ;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @program: test
 * @description: 延时队列rabbitMQ配置
 * @author: xingcheng
 * @create: 2018-08-12 12:27
 **/
@Configuration
public class RabbitConfig {

    @Bean
    public DirectExchange myExchange() {
        return new DirectExchange(MqConstant.MY_EXCHANGE, true, false);
    }

    @Bean
    public Queue myQueueOne() {
        return new Queue(MqConstant.MY_QUEUE_ONE, true, false, false);
    }

    @Bean
    public Queue myQueueTwo() {
        return new Queue(MqConstant.MY_QUEUE_TWO, true, false, false);
    }
    @Bean
    public Queue myTransQueue() {
        return new Queue(MqConstant.MY_TRANS_QUEUE, true, false, false);
    }

    @Bean
    public Queue deadLetterQueue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", MqConstant.MY_EXCHANGE);
        map.put("x-dead-letter-routing-key", MqConstant.MY_TRANS_QUEUE);
        Queue queue = new Queue(MqConstant.DEAD_LETTER_QUEUE, true, false, false, map);
        System.out.println("arguments :" + queue.getArguments());
        return queue;
    }

    @Bean
    public Binding queueOneBinding() {
        return BindingBuilder.bind(myQueueOne()).to(myExchange()).with(MqConstant.MY_QUEUE_ONE);
    }

    @Bean
    public Binding queueTwoBinding() {
        return BindingBuilder.bind(myQueueTwo()).to(myExchange()).with(MqConstant.MY_QUEUE_TWO);
    }

    @Bean
    public Binding queueDeadBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(myExchange()).with(MqConstant.DEAD_LETTER_QUEUE);
    }

    @Bean
    public Binding queueTransBinding() {
        return BindingBuilder.bind(myTransQueue()).to(myExchange()).with(MqConstant.MY_TRANS_QUEUE);
    }
}

队列常量配置:

package cn.chinotan.service.delayQueueRabbitMQ;

/**
 * @program: test
 * @description: rabbitMq常量
 * @author: xingcheng
 * @create: 2018-08-12 12:30
 **/
public class MqConstant {
    
    public static final String MY_EXCHANGE = "my_exchange";
    
    public static final String MY_QUEUE_ONE = "my_queue_one";

    public static final String MY_QUEUE_TWO = "my_queue_two";
    
    public static final String DEAD_LETTER_QUEUE = "dead_letter_queue";
    
    public static final String MY_TRANS_QUEUE = "my_trans_queue";
    
}

测试延时controller:

package cn.chinotan.controller;

import cn.chinotan.service.delayQueueRabbitMQ.MqConstant;
import cn.chinotan.service.delayQueueRabbitMQ.Producr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @program: test
 * @description: 延时队列启动类
 * @author: xingcheng
 * @create: 2018-08-12 15:41
 **/
@RestController
@RequestMapping("/delayQueue")
public class DelayQueueController {

    private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueController.class);

    @Autowired
    private Producr producr;

    @GetMapping("/send/{time}")
    public String send(@PathVariable("time") int time){
        LOGGER.info("{}秒后, 发送延迟消息,当前时间{}", time, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        producr.send("我是延时消息...", time, MqConstant.MY_QUEUE_TWO);
        return "ok";
    }
    
}

测试结果:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018/08/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档