前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ学习-消息发布和订阅

RocketMQ学习-消息发布和订阅

作者头像
阿杜
发布2018-08-06 10:49:10
5.9K0
发布2018-08-06 10:49:10
举报
文章被收录于专栏:阿杜的世界

前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。

一、RocketMQ消息模型

屏幕快照 2018-03-31 14.50.41.png

在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。

producer会轮询向指定topic的mq集合发送消息。

consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。

consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。

二、消息发送

生产者Demo

首先给出代码,

代码语言:javascript
复制
package com.javadu.chapter8rocketmq.message;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

import javax.annotation.PostConstruct;

/**
 * 作用: 同步发送消息
 * User: duqi
 * Date: 2018/3/29
 * Time: 13:52
 */
@Component
public class ProducerDemo {

    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);

        defaultMQProducer.setNamesrvAddr(namesrvAddr);

        try {
            defaultMQProducer.start();

            Message message = new Message("TopicTest", "TagA",
                                          "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

            for (int i = 0; i < 100; i++) {
                SendResult sendResult = defaultMQProducer.send(message);
                System.out.println("发送消息结果, msgId:" + sendResult.getMsgId() +
                                   ", 发送状态:" + sendResult.getSendStatus());
            }

        } catch (MQClientException | UnsupportedEncodingException | InterruptedException
            | RemotingException | MQBrokerException e) {
            e.printStackTrace();
        } finally {
            defaultMQProducer.shutdown();
        }
    }

}

生产者中有两个属性:

  • name server的地址,用于获得broker的相关信息
  • 生产者集合producerGroup,在同一个producer group中有不同的producer实例,如果最早一个producer奔溃,则broker会通知该组内的其他producer实例进行事务提交或回滚。

RocketMQ中的消息,使用Message表示,代码定义如下:

代码语言:javascript
复制
public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;

    public Message() {
    }
    //省略了getter和setter方法
}
  • topic:该消息将要往哪个topic发
  • flag:可以用作消息过滤
  • properties:扩展字段,可以做一些系统级别的通用值的透传,例如skywalking的segmentId
  • body:消息内容

每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:

代码语言:javascript
复制
public class SendResult {
    //发送状态
    private SendStatus sendStatus;
    //消息ID,用于消息去重、消息跟踪
    private String msgId;
    private MessageQueue messageQueue;
    private long queueOffset;
    //事务ID
    private String transactionId;
    private String offsetMsgId;
    private String regionId;
    //是否需要跟踪
    private boolean traceOn = true;

    public SendResult() {
    }
    //省略了构造函数、getter和setter等一系列方法
}

在这个demo中,我们是将消息内容和消息状态一并打印到控制台。

消息发送源码分析

在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。

屏幕快照 2018-03-31 11.51.36.png

首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。

MQProducer.png

在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:

  • 根据服务状态决定接下来的动作
  • 对于CREATE_JUST状态
    • 设置服务状态
    • 检查配置
    • 获取或创建MQClientInstance实例
    • 将生产者注册到指定的producerGroup,即producerTable这个数据结构中,是一个map
    • 填充topicPublishInfoTable数据结构
    • 启动生产者
  • 对于RUNNING、START_FAILED和SHUTDOWN_ALREADY,抛出异常
代码语言:javascript
复制
 public void start(final boolean startFactory) throws MQClientException {
        //根据当前的服务状态决定接下来的动作
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                //创建一个客户端工厂
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //将生产者注册到指定producer group
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                //填充topicPublishInfoTable
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        //给该producer连接的所有broker发送心跳消息
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

顺着mQClientFactory.start()往下跟,可以进一步了解生产者的细节,主要步骤有:

  • 建立请求响应通道
  • 启动各种定时任务,例如:每隔2分钟向name server拉取一次broker集群的地址,这意味着如果某个broker宕机了,生产者在这两分钟之内的消息是投递失败的;定期从name server拉取topic等路由信息;定期清理失效的broker以及向broker发送心跳消息等。
  • 启动拉服务、负载均衡服务、推服务等服务,这三个服务跟消费者有关。这里设计上不太明了,将消费者和生产者的启动逻辑放在一起了。看pullMessageService和rebalanceService和初始化,它们是根据MQClientInstance初始化的,而MQClientInstance又是根据ClientConfig来配置的。
代码语言:javascript
复制
  public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。

屏幕快照 2018-03-31 12.26.32.png

这里我们只看最简单的send(Message message)方法,最终在DefaultMQProducerImpl中实现:

代码语言:javascript
复制
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //确认生产者状态正常
        this.makeSureStateOK();
        //检查消息的合法性
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //获取消息的目的地:Topic信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //计算出消息的投递次数,如果是同步投递,则是1+重试次数,如果不是同步投递,则只需要投递一次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            //一个broker集群有不同的broker节点,lastBrokerName记录了上次投递的broker节点,每个broker节点
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择一个要发送的消息队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //投递消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        //根据消息发送模式,对消息发送结果做不同的处理
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

发送消息的主要过程如下:

  • 首先检查生产者和消息的合法性
  • 然后获取消息发送的信息,该信息存放在TopicPublishInfo对象中:
代码语言:javascript
复制
public class TopicPublishInfo {
    //是否顺序消息
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    //维护该topic下用于的消息队列列表
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //计算下一次该投递的队列,这里应用ThreadLocal,即使是同一台机器中,每个producer实例都有自己的队列
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;

    //省略了getter和setter方法
    
    //选择指定lastBrokerName上的下一个mq
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

    //选择当前broker节点的下一个mq
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
}
  • 选择要发送给该topic下的那个MessageQueue,选择的逻辑分两种情况:(1)默认情况,在上次投递的broker节点上,轮询到下一个message queue来发送;(2)sendLatencyFaultEnable这个值设置为true的时候,这块没太看懂。
  • 投递消息
  • 根据消息队列运行模式,针对投递结果做不同的处理。

二、消息消费

消费者Demo

消费者里有个属性需要看下:

  • consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;consumerGroup中的实例还可以实现负载均衡和容灾。PS:处于同一个consumerGroup里的consumer实例一定是订阅了同一个topic。
  • nameServer的地址:name server地址,用于获取broker、topic信息

消费者Demo里做了以下几个事情:

  • 设置配置属性
  • 设置订阅的topic,可以指定tag
  • 设置第一次启动的时候,从message queue的哪里开始消费
  • 设置消息处理器
  • 启动消费者
代码语言:javascript
复制
package com.javadu.chapter8rocketmq.message;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 作用:
 * User: duqi
 * Date: 2018/3/29
 * Time: 14:00
 */
@Component
public class ConsumerDemo {

    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.consumerGroup}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "TagA");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        //输出消息内容
                        System.out.println("messageExt: " + messageExt);

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        //输出消息内容
                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //稍后再试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者源码分析

前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。

MQConsumer.png

在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:

代码语言:javascript
复制
    /**
     * Subscribe a topic to consuming subscription.
     *
     * @param topic topic to subscribe.
     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
     * if null or * expression,meaning subscribe all
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
    }

第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:

代码语言:javascript
复制
    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            //构建包含订阅信息的对象,并放入负载平衡组件维护的map中,以topic为key
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            //如果已经跟broker集群建立连接,则给所有的broker节点发送心跳消息
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值

  • CONSUME_FROM_LAST_OFFSET,默认值,表示从上次停止时的地方开始消费
  • CONSUME_FROM_FIRST_OFFSET,从队列的头部开始消费
  • CONSUME_FROM_TIMESTAMP,从指定的时间点开始消费

ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):

代码语言:javascript
复制
    /**
     * Register a callback to execute on message arrival for concurrent consuming.
     *
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:

  • 检查配置
  • 将订阅信息拷贝到负载均衡组件(rebalanceImpl)中;
  • 负载均衡组件的几个属性的设置
  • 处理不同消息模式(集群模式或广播模式)的配置
  • 处理顺序消费和并发消费的不同配置
  • 将消费者信息和consumer group注册到MQ客户端实例的consumerTable中
  • 启动消费者客户端

参考资料

  1. 分布式开放消息系统(RocketMQ)的原理与实践
  2. 买好车提供的rocketmq-spring-boot-starter
  3. Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.03.31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、RocketMQ消息模型
  • 二、消息发送
    • 生产者Demo
      • 消息发送源码分析
      • 二、消息消费
        • 消费者Demo
          • 消费者源码分析
          • 参考资料
          相关产品与服务
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档