ActiveMQ笔记(6):消息延时投递

在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说明,本文只介绍二种常用的用法:

注:本文采用spring的JmsTemplate来发送消息

步骤1、首先要修改activemq.xml配置文件,启用延时投递

1 <broker xmlns="http://activemq.apache.org/schema/core" ... schedulerSupport="true" >
2     ...
3   </broker>

即:在broker节点加上schedulerSupport="true",然后重启activemq即可

步骤2、定义一个MessagePostProcessor的实现类

import javax.jms.JMSException;
import javax.jms.Message;

import lombok.Data;
import org.apache.activemq.ScheduledMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.jms.core.MessagePostProcessor;

/**
 * MQ延时投递处理器(注:ActiveMQ的配置文件中,要配置schedulerSupport="true",否则不起作用)
 * by: 杨俊明 2016-06-16
 */
@Data
public class ScheduleMessagePostProcessor implements MessagePostProcessor {

    private long delay = 0l;

    private String corn = null;

    public ScheduleMessagePostProcessor(long delay) {
        this.delay = delay;
    }

    public ScheduleMessagePostProcessor(String cron) {
        this.corn = cron;
    }

    public Message postProcessMessage(Message message) throws JMSException {
        if (delay > 0) {
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        }
        if (!StringUtils.isEmpty(corn)) {
            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, corn);
        }
        return message;
    }

}

步骤3、jmsTemplate发送示例

        Object message1 = "corn消息内容:" + DateUtil.formatDate(new Date());
        //分 时 天 月 星期几
        jmsTemplate.convertAndSend(message1, new ScheduleMessagePostProcessor("40 22 * * *"));
        logger.info("消息1:[" + message1 + "] 延时发送成功!");

        jmsTemplate.convertAndSend(message1, new ScheduleMessagePostProcessor("50 22 * * *"));
        logger.info("消息1:[" + message1 + "] 延时发送成功!");


        Object message2 = "message:" + DateUtil.formatDate(new Date());
        jmsTemplate.convertAndSend(message2, new ScheduleMessagePostProcessor(30 * 1000));//延时30秒
        jmsTemplate.convertAndSend(message2, new ScheduleMessagePostProcessor(3600 * 24 * 1000));//延时24小时
        logger.info("消息2:[" + message2 + "] 延时发送成功!");

上面的代码演示了二种延时的用法:延时N毫秒、按corn表达式延时(注:此corn表达式并非Quartz框架中的corn表达式,而是linux中corntab中的表达 式,基本顺序是"分(0-59) 时(0-23) 日(1-31) 月(1-12) 星期几(1-7) ")

发送成功后,可以登录activemq的webconsole查看消息的属性:

在scheduled面板中,可以看到延时的消息

注:在开启消息持久化存储的前提下,就算把相应的queue在webconsole面板中删除(即删除队列),只要投递的时间尚未到,该消息也不会删除,仍然能正常延时投递。

此外,在queues面板中,如何查看某条具体的消息,也可以通过属性发现这条消息是延时消息,参考下图:

参考文章: 1、Delay and Schedule Message Delivery

2、喂鸡百科上的Corn表达式解释 (中文)

3、喂鸡百科上的Corn表达式解释 (英文)

4、 kahaDB官方文档

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯云Elasticsearch Service

Elasticsearch底层系列之Shard Allocation机制

    Elasticsearch由一些Elasticsearch进程(Node)组成集群,用来存放索引(Index)。为了存放数据量很大的索引,Elastic...

2.2K30
来自专栏Seebug漏洞平台

使用 XML 内部实体绕过 Chrome 和 IE 的 XSS 过滤器

来源:BypassingXSSFiltersusingXMLInternalEntities 原作者:DavidLitchfield (david@davidl...

415100
来自专栏张善友的专栏

实现WebSocket和WAMP协议的开源库WampSharp

Websocket Application Messaging Protocol 协议:https://github.com/wamp-proto/wamp-p...

23070
来自专栏时序数据库专栏

Elasticsearch集群Shard Allocation机制

    Elasticsearch由一些Elasticsearch进程(Node)组成集群,用来存放索引(Index)。为了存放数据量很大的索引,Elastic...

21900
来自专栏草根专栏

使用Identity Server 4建立Authorization Server (1)

本文内容基本完全来自于Identity Server 4官方文档: https://identityserver4.readthedocs.io/ 官方文档很详...

533100
来自专栏高性能服务器开发

+从零实现一款12306刷票软件1.2

当然,这里需要说明一下的就是,由于全国的火车站点信息文件比较大,我们程序解析起来时间较长,加上火车站编码信息并不是经常变动,所以,我们我们没必要每次都下载这个s...

22220
来自专栏魏艾斯博客www.vpsss.net

如何创建.htaccess 文件

关于.htaccess 文件,一般用于虚拟主机中,使用 VPS 建站的可以忽略了。对于使用虚拟主机建站的朋友来说.htaccess 文件可以用作伪静态化设置和 ...

47280
来自专栏大魏分享(微信公众号:david-share)

用Ansible自动供应vmware虚拟机--构建数据中心一体化运维平台第二篇

1.1 简述 一直以来,打开邮箱被ticket糊一脸的事情时有发生。我一直在想,能不能以一种简单的方案(不花老板的钱)来供应(provisioning)虚拟机呢...

73720
来自专栏较真的前端

同样是客户端会话级存储,sessionStorage和session cookie有什么?

35740
来自专栏逸鹏说道

并发编程~先导篇上

并发 :一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。

21280

扫码关注云+社区

领取腾讯云代金券