前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

原创
作者头像
菜菜的后端私房菜
发布2024-09-13 09:15:03
290
发布2024-09-13 09:15:03
举报
文章被收录于专栏:消息中间件

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

上篇文章主要介绍消息中间件并以RocketMQ架构展开描述其核心组件以及MQ运行流程

本篇文章以Product的视角来看看发送消息的核心原理与设计思想,最后以图文并茂的方式描述出发送消息的核心流程

消息发送方式

RocketMQ中普通消息提供三种发送方式:同步、异步、单向

上篇文章中我们已经使用封装好的API延时过同步发送

在使用三种方式前,我们先来理解它们的理论知识

同步发送:发送完消息后,需要阻塞直到收到Broker的响应,通常用于数据一致性较高的操作,需要确保消息到达Broker并持久化

同步发送收到响应并不一定就是成功,还需要根据响应状态进行判断

SendResult响应状态包括:

  1. SEND_OK:发送成功
  2. FLUSH_DISK_TIMEOUT:刷盘超时
  3. FLUSH_SLAVE_TIMEOUT:同步到备超时
  4. SLAVE_NOT_AVAILABLE:备不可用

(这些状态与设置的刷盘策略有关,后续保证消息可靠的文章再进行详细展开说明,本篇文章还是回归主线探究发送消息)

异步发送:发送完消息后立即响应,不需要阻塞等待,但需要设置监听器,当消息成功或失败时进行业务处理,可以在失败时进行重试等其他逻辑保,通常用于追求响应时间的场景

异步发送相当于同步发送,需要新增SendCallback回调来进行后续成功/失败的处理,并且异步发送没有返回值

代码语言:java
复制
@GetMapping("/asyncSend")
public String asyncSend() {
    producer.sendAsyncMsg(topic, "tag", "async hello world!", new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("消息发送成功{}", sendResult);
        }

        @Override
        public void onException(Throwable throwable) {
            log.info("消息发送失败", throwable);
            //记录后续重试
        }
    });
    return "asyncSend ok";
}

原生API封装:

代码语言:java
复制
public void sendAsyncMsg(String topic, String tag, String jsonBody, SendCallback sendCallback) {
        Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
        try {
            producer.send(message, sendCallback);
        } catch (MQClientException | RemotingException | InterruptedException e) {
            throw new RuntimeException(e);
        }
}

单向发送:只要发出消息就响应,性能最好,通常用于追求性能,不追求可靠的场景,如:异步日志收集

由于单向发送的特性,即不需要回调也没有返回结果

代码语言:java
复制
@GetMapping("/sendOnewayMsg")
public String onewaySend() {
    producer.sendOnewayMsg(topic, "tag", "oneway hello world!");
    return "sendOnewayMsg ok";
}

原生API封装:

代码语言:java
复制
public void sendOnewayMsg(String topic, String tag, String jsonBody) {
    Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
    try {
        producer.sendOneway(message);
    } catch (MQClientException | RemotingException | InterruptedException e) {
        throw new RuntimeException(e);
    }
}

发送消息原理

在研究发送消息的原理前,不妨来思考下,如果让我们实现,我们要思考下需要哪些步骤?

像我们平时进行业务代码编写前的第一步就是进行参数校验,因为要防止参数“乱填”的意外情况

然后由于需要找到对应的Broker,那肯定要获取Topic路由相关信息

这个路由信息前文说过是从NameServer集群定时获取即时更新的,那么客户端的内存里肯定会进行存储

像这样的数据肯定是类似于多级缓存的,先在本地缓存,如果本地没有或者本地是旧数据,那么就网络通信再去远程(NameServer)获取一份后再更新本地缓存

获取完路由信息后,可以通过设置的Topic获取对应的MessageQueue队列信息,因为Topic下可能有很多队列,因此需要负载均衡算法决定要发送的队列

rocketmq发送消息还提供超时、重试等机制,因此在这个过程中需要计算时间、重试次数

最后发送消息会进行网络通信,我们要选择合适的工具进行RPC

总结一下,如果让我们设计起码要有这些流程:参数校验、获取路由信息、根据负载均衡算法选择队列、计算超时,重试次数、选择网络通信RPC工具...

在设计完流程后,如果我们是一位”成熟的设计师“,那么一定会将这些步骤中通用的步骤抽象成模板,模板可以作为三种发送消息通用方式,而那些变动的就是策略,解耦互不影响,并在重要的流程前后留下”钩子“,方便让使用者进行扩展

rocketmq流程与我们设计、思考的流程类似,先准备一张最终的流程图,方便跟着流程图一起阅读源码:

image.png
image.png
sendDefaultImpl 通用发送消息模板

通过三种发送方式,都会来到DefaultMQProducerImpl.sendDefaultImpl这个就是通用方法的模板

代码块中只展示部分关键代码,流程如下:

  1. 参数校验 Validators.checkMessage
  2. 获取路由信息 tryToFindTopicPublishInfo
  3. 选择一个要发送的MessageQueue selectOneMessageQueue
  4. 发送消息 sendKernelImpl

在3、4步骤中还会进行重试、超时判断

代码语言:java
复制
private SendResult sendDefaultImpl(
    //消息
    Message msg,
    //方式
    final CommunicationMode communicationMode,
    //异步的回调
    final SendCallback sendCallback,
    //超时时间
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //参数校验
    Validators.checkMessage(msg, this.defaultMQProducer);
    
    //获取路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        
        //计算重试次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        //已经重发次数
        int times = 0;
        //重试循环
        for (; times < timesTotal; times++) {
            //上次试过的Broker
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            
            //选择一个要发送的MessageQueue
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    //重发时设置topic
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    //超时退出
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }

                    //发送
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    //记录延时
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    //最后分情况处理
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            //如果响应状态不成功 如果设置重试其他broker则进行重试
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } 
       //...             
}

其中CommunicationMode就是发送的方式,分别为:SYNC同步、ASYNC异步、ONEWAY单向

tryToFindTopicPublishInfo 获取路由信息

rocketmq中使用大量散列表存储数据,其中存储路由信息的是

代码语言:java
复制
ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>()

topicPublishInfoTable中Key为topic,Value为路由信息TopicPublishInfo

TopicPublishInfo中主要包括messageQueueList对应的队列列表、sendWhichQueue后续用来选择哪一个队列、topicRouteData路由数据

在topicRouteData路由数据中主要有brokerDatas、queueDatas

brokerDatas包含所有的Broker信息,queueDatas包含每个broker上对应的数据,比如读写队列数量

image.png
image.png

在获取路由信息的方法中,先尝试从本地获取 this.topicPublishInfoTable.get ,如果本地不存在则从NameServer获取 this.mQClientFactory.updateTopicRouteInfoFromNameServer

(这里的this.mQClientFactory实际上是MQClientInstance,生产者、消费者都会用到,用于客户端远程调用服务端,里面也会存对应相关的组件)

代码语言:java
复制
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //本地获取
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //远程获取
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //远程获取
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
selectOneMessageQueue 选择队列

选择队列默认情况下会来到这里,会通过线性轮询选择队列 selectOneMessageQueue,重试的区别为本次选择的broker不和上次的相同

(因为上次失败broker可能会存在问题,这次就换一个broker)

代码语言:java
复制
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //lastBrokerName:上一次的broker
    if (lastBrokerName == null) {
        //线性轮询选择队列 selectOneMessageQueue
        return selectOneMessageQueue();
    } else {
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            //线性轮询选择队列
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //找到不和上次一样的broker
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}
sendKernelImpl 封装消息

在发送消息前需要对消息进行封装,如:设置唯一ID、尝试压缩消息、封装消息头等

在发送前还有检查禁止发送的钩子和发送前后执行的钩子,方便扩展

代码语言:java
复制
private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

    
    //获取broker信息
    String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq);
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName);
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            //不是批量消息就设置唯一ID
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            //尝试压缩消息
            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                sysFlag |= compressType.getCompressionFlag();
                msgBodyCompressed = true;
            }

			//尝试执行检查禁止发送消息的钩子
            if (hasCheckForbiddenHook()) {
                //...
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            //尝试执行发送消息前的钩子
            if (this.hasSendMessageHook()) {
                //...
                this.executeSendMessageHookBefore(context);
            }

            //封装消息头
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            //set...

            
            //根据不同的发送方式调整
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    //...
                    //获取MQ客户端发送
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    //检查超时
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    //获取MQ客户端发送消息
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        brokerName,
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            //尝试执行发送完消息的钩子
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } 
        //...
}
使用Netty进行网络通信RPC

同步消息最终会调用invokeSync,这种服务间的网络通信又称为远程调用RPC

在RPC前后也有钩子可以进行扩展

最终调用invokeSyncImpl会通过netty的channel进行写数据

代码语言:java
复制
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
    throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    long beginStartTime = System.currentTimeMillis();
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
            //rpc前的钩子
            doBeforeRpcHooks(addr, request);
            long costTime = System.currentTimeMillis() - beginStartTime;
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
            }
        	//使用netty的channel写数据
            RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
        	//rpc后的钩子
            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            this.updateChannelLastResponseTime(addr);
            return response;
    } 
}

通过netty的channel写请求,并添加监听器,最后使用结果调用waitResponse进行同步等待

代码语言:java
复制
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    try {
        //结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        //写请求 并添加监听器
        channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
            //...
        });

        //同步调用 等待结果
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}

异步消息RPC类似,只是不需要最后的同步等待

重试机制

走完整体的发送消息流程,我们再回过头来查看重试机制

总共尝试发送消息的次数取决于 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1

如果是同步消息则为 1 + retryTimesWhenSendFailed 默认2次 = 3次,其他情况就是1次

也就是说只有同步发送才会重试!异步、单向都不会进行重试?

就在我查找同步最大重试次数 retryTimesWhenSendFailed 时,同时还发现异步的最大重试次数 retryTimesWhenSendAsyncFailed

实际上异步发送重试的代码在异常的catch块中,异常才去执行 onExceptionImpl

如果重试同步发送时,需要去其他broker还要把 retryAnotherBrokerWhenNotStoreOK 设置为true,默认false

发送消息流程总结

至此发送消息的流程算是过了一遍,在查看源码的过程中大部分内容都是见名知意的,这不比公司的”shit mountain“看着舒服?

最后再来总结下流程,便于同学们记忆:

image.png
image.png
  1. 先校验参数,避免参数出错
  2. 再获取Topic路由信息,如果本地没有就从NameServer获取
  3. 然后通过线性轮询法选择队列,如果retryAnotherBrokerWhenNotStoreOK 开启后,同步失败新的重试会选择其他broker
  4. 紧接着对消息进行封装,设置唯一ID、压缩消息、检查禁止发送钩子、发送前后钩子等
  5. 最后使用Netty写请求进行rpc,期间也会有rpc的钩子,如果是同步则会等待
  6. 在此期间会进行重试、超时检测

总结

消息发送的方式有三种:同步、异步、单向,根据顺序可靠性逐渐下降、性能逐渐提升

同步消息能够通过响应判断是否真正成功,常用于需要消息可靠、数据一致的场景,如同步

异步消息通过实现回调处理成功与失败,常用于响应时间敏感的场景,如异步短信

单向消息不需要进行处理,常用于追求性能的场景,如异步日志

消息发送的过程中会先检查消息参数,确保消息无误,再获取路由信息,如果本地不存在则向NameServer获取

路由信息存储topic对应的broker、队列列表、broker上的队列等相关信息

然后通过线性轮询算法选择要发送消息的队列,如果重试则不会选择相同的broker

接着会设置消息的唯一ID、判断是否压缩消息、尝试执行检查禁止发送、发送消息前后的钩子等

最后使用netty写请求进行rpc调用,同时也会有rpc前后的钩子

在此期间同步、异步会根据参数进行超时检查、重试等操作

最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)
    • 消息发送方式
      • 发送消息原理
        • sendDefaultImpl 通用发送消息模板
        • tryToFindTopicPublishInfo 获取路由信息
        • selectOneMessageQueue 选择队列
        • sendKernelImpl 封装消息
        • 使用Netty进行网络通信RPC
        • 重试机制
        • 发送消息流程总结
      • 总结
        • 最后(点赞、收藏、关注求求啦~)
    相关产品与服务
    消息队列 RocketMQ 版
    消息队列 RocketMQ 版(TDMQ for RocketMQ,简称TDMQ RocketMQ 版) 是一款分布式高可用的消息队列服务,基于 Apache RocketMQ 的 4.x 和 5.x 架构提供不同的产品形态,支持开源客户端零改造接入,同时具备计算存储分离,灵活扩缩容的优势。TDMQ RocketMQ 版可以支持百万级 TPS 的吞吐量,适用于各类大规模、低延时、对可靠性要求高的在线消息业务场景。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档