专栏首页java 成神之路RocketMQ 延迟消息

RocketMQ 延迟消息

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。

预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间。broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

broker 处理延迟消息

CommitLog.putMessage()

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // 设置消息的存储时间
    msg.setStoreTimestamp(System.currentTimeMillis());
    // 设置消息体的校验位
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    // 获取消息的 SysFlag 
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    // 1、非事务消息 或 已commit事物消息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // 2、判断消息是否设置延迟
        if (msg.getDelayTimeLevel() > 0) {
            // 3、判断设置的延迟等级是否大于最大级别,如果大于最大值,则设置最大值(默认最大级别为18)
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 4、延迟消息的 Topic 名称为 “SCHEDULE_TOPIC_XXXX”
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // 5、根据延迟级别获取对应的 Queue 。一个延迟级别对应一个 Queue
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 6、消息原始的 Topic 名称和 QueueId 备份保存到 property 中
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 7、修改消息的 topic 和 queueId,让该消息先投递到延迟消息队列中
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
    // 省略代码
    ........
}

1、判断该消息类型,如果是非事物消息或事物已commit消息,才能处理延迟消息。 2、判断该消息是否设置延迟,如果延迟级别大于零,则说明该消息时延迟消息。 3、判断设置的延迟等级是否大于最大级别,如果大于最大值,则设置最大值(默认最大级别为18) 4、延迟消息的 Topic 名称为 “SCHEDULE_TOPIC_XXXX” 5、根据延迟级别获取对应的 Queue 。一个延迟级别对应一个 Queue 6、消息原始的 Topic 名称和 QueueId 备份保存到 property 中 7、修改消息的 topic 和 queueId,让该消息先投递到延迟消息队列中

延迟消息级别

MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

解析初始化延迟级别

// 存储消息级别对应的延迟时间
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);

// 解析并初始化消息延迟级别
public boolean parseDelayLevel() {
    // 时间单位
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);
    // 获取 messageDelayLevel 定义的延迟消息信息
    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }

    return true;
}

解析messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; 字符串,并每一个延迟时间对应一个延迟级别,存储到 delayLevelTable 中。

用户只需要设置延迟级别,然后通过 delayLevelTable 就知道该级别对应的延迟时间是多少。

处理延迟消息

public void start() {
    // 为每一个延迟级别设置一个定时任务处理消息的投递
    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
        Integer level = entry.getKey();
        Long timeDelay = entry.getValue();
        Long offset = this.offsetTable.get(level);
        if (null == offset) {
            offset = 0L;
        }

        if (timeDelay != null) {
            this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
        }
    }
    // 定时持久化 每个消息级别处理对应queue的offset信息
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                ScheduleMessageService.this.persist();
            } catch (Throwable e) {
                log.error("scheduleAtFixedRate flush exception", e);
            }
        }
    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

1、为每一个延迟队列创建一个定时任务,定时处理延迟队列中的数据,把该数据从延迟队列中取出,然后投递到实际发送的消息队列(queue)中。

2、定时持久化每个消息级别处理对应queue的offset信息。(启动后延迟10秒开始持久化,以后每间隔10秒保存一次)

延迟消息投递

在 DeliverDelayedMessageTimerTask 中处理延迟消息的投递,代码如下:

public void executeOnTimeup() {
    // 根据 topic 和 queueId 获取延迟队列对应的 ConsumeQueue
    ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

    long failScheduleOffset = offset;

    if (cq != null) {
        // 通过偏移量获取延迟队列 MappedFile (MappedFile 对应的 Buffer)
        // ConsumerQueue 中每个消息存储的长度为20位,而 offset 是消息的个数,实际的偏移量为 offset * 20
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                // bufferCQ.getSize() 为延迟队列中可以读取到的延迟消息长度(包括已到时间和未到实际的数据)
                // ConsumeQueue.CQ_STORE_UNIT_SIZE 为20。 ConsumerQueue 中每个消息固定的长度。
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    // 从ConsumerQueue 中获取一条消息。
                    // 消息包括3部分:物理偏移量、消息大小、Tag的HashCode
                    // 这里的tagsCode在延迟消息队列中存储是存储在 【延迟队列中的时间 + 延迟的时间】(通过这个时间来确定消息是否达到延迟的时间)
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    // 通过 tagsCode 来判断是否存储的是延时时间
                    // 如果是 Tag 的 hashcode ,那么最大值为 Integer.Max
                    // 如果是 延迟时间,时间为long类型,肯定大于 Integer.Max
                    if (cq.isExtAddr(tagsCode)) {
                        //获取延迟发送时间
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        }
                        // 从commitLog中获取存储时间,然后从新计算延迟发送时间。延迟时发送时间=消息发送到延迟队列存储时间+延迟时间
                        else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                    tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 计算投递时间,如果已经到投递时间,则返回当前时间,否则返回需要等待投递的时间
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    long countdown = deliverTimestamp - now;
                    // countdown <=0 是需要马上投递的延迟消息
                    if (countdown <= 0) {
                        // 从 CommitLog 中获取当前消息的信息
                        MessageExt msgExt =
                                ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                // 这里从 property 中解析出正真的 Topic、QueueId、TagCode 信息,存储到 msgInner 中。
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                // 消息投递,跟 producer 发送消息处理流程一样。
                                PutMessageResult putMessageResult =
                                        ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);
                                // 如果处理成功,则继续下一条处理
                                if (putMessageResult != null
                                        && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    continue;
                                } 
                                // 如果处理失败
                                else {
                                    // 打印失败信息
                                    log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                            msgExt.getTopic(), msgExt.getMsgId());
                                    // 则从新创建一个定时任务
                                    ScheduleMessageService.this.timer.schedule(
                                            new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                    //并记录下处理延迟队列的 offset
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                            nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                log.error(
                                        "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    }
                    // 未到投递时间
                    else {
                        // 重新创建一个定时任务,延迟 countdown 长时间在执行
                        ScheduleMessageService.this.timer.schedule(
                                new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                countdown);
                        // 更新延迟队列待处理消息的 offset
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                        this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {

            long cqMinOffset = cq.getMinOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                        + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } 
    // 如果出现异常,则创建一个100毫秒延迟的定时任务
    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
}

这里的注释已经写的很清楚了,就不解释了。

延迟消息 TagCode 值

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
        final boolean readBody) {
        // 省略代码
        ......

       // Timing message processing
        {
            String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
            if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
                int delayLevel = Integer.parseInt(t);

                if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                }
                // 如果是延迟消息队列,则ConsumerQueue中的 tagsCode 存储的是要投递的时间(存储时间+延迟时间)
                if (delayLevel > 0) {
                    tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                storeTimestamp);
                }
            }
        //省略代码
        ......
}

从这里看出,如果是延迟消息,则 TagCode 中存储的是消息需要投递到正在消息队列的时间。而不是 Tag 的 hashcode 。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RocketMQ 生产者 Producer 启动过程

    从类关系中可以看出,MQProducer 有两种实现方式。一个是 DefaultMQProducer,另一个是 TransactionMQProducer。

    java404
  • RocketMQ 存储机制源码解析

    producer 发送消息后,broker端开始存储消息,会调用 store 模块的 DefaultMessageStore.putMessage 进行存储消息...

    java404
  • RocketMQ 底层通信机制 源码分析

    RocketMQ 底层通讯是使用Netty来实现的。 下面我们通过源码分析下RocketMQ是怎么利用Netty进行通讯的。

    java404
  • Spring Security 架构与源码分析

    Spring Security 主要实现了Authentication(认证,解决who are you? ) 和 Access Control(访问控制,也就...

    用户1177380
  • python3.7 连接sql server出现pymssql.OperationalError: (20009, b'DB-Lib error message 20009, severity ...

    今天在使用python3.7中的pymssql 连接sqlserver的时候遇到的问题:

    小海怪的互联网
  • pytorch入门

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    于小勇
  • Elasticsearch(四)——Analyzer

    索引页对应的倒排索引 单词到索引的关联 倒排索引是搜索引擎的核心,主要包含两部分 单词词典(Term Dictionary) 记录所有文档的单词,一般比...

    羊羽shine
  • Android新手之旅(2) 新手问题

    1、添加控件,运行,但有一个main.out.xml老是报错,说没有内容 原因:和vs有所不同,eclipse的运行编译与当前打开的文件密切相关,所以就出问题...

    用户1075292
  • MUI列表式布局

    列表式布局,是移动端布局的常见布局。其内容从上往下排列,导航之间的跳转要回到初始点。层次展示清晰,视觉效果好,体验方便快捷。常用应用于各种设置页面。非常的美观。...

    算法与编程之美
  • Android新手之旅(2) 新手问题

    1、添加控件,运行,但有一个main.out.xml老是报错,说没有内容 原因:和vs有所不同,eclipse的运行编译与当前打开的文件密切相关,所以就出问题...

    用户1075292

扫码关注云+社区

领取腾讯云代金券