RocketMQ学习-消息发布和订阅

前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。

一、RocketMQ消息模型

屏幕快照 2018-03-31 14.50.41.png

在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。

producer会轮询向指定topic的mq集合发送消息。

consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。

consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。

二、消息发送

生产者Demo

首先给出代码,

package com.javadu.chapter8rocketmq.message;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

import javax.annotation.PostConstruct;

/**
 * 作用: 同步发送消息
 * User: duqi
 * Date: 2018/3/29
 * Time: 13:52
 */
@Component
public class ProducerDemo {

    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);

        defaultMQProducer.setNamesrvAddr(namesrvAddr);

        try {
            defaultMQProducer.start();

            Message message = new Message("TopicTest", "TagA",
                                          "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));

            for (int i = 0; i < 100; i++) {
                SendResult sendResult = defaultMQProducer.send(message);
                System.out.println("发送消息结果, msgId:" + sendResult.getMsgId() +
                                   ", 发送状态:" + sendResult.getSendStatus());
            }

        } catch (MQClientException | UnsupportedEncodingException | InterruptedException
            | RemotingException | MQBrokerException e) {
            e.printStackTrace();
        } finally {
            defaultMQProducer.shutdown();
        }
    }

}

生产者中有两个属性:

  • name server的地址,用于获得broker的相关信息
  • 生产者集合producerGroup,在同一个producer group中有不同的producer实例,如果最早一个producer奔溃,则broker会通知该组内的其他producer实例进行事务提交或回滚。

RocketMQ中的消息,使用Message表示,代码定义如下:

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;

    public Message() {
    }
    //省略了getter和setter方法
}
  • topic:该消息将要往哪个topic发
  • flag:可以用作消息过滤
  • properties:扩展字段,可以做一些系统级别的通用值的透传,例如skywalking的segmentId
  • body:消息内容

每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:

public class SendResult {
    //发送状态
    private SendStatus sendStatus;
    //消息ID,用于消息去重、消息跟踪
    private String msgId;
    private MessageQueue messageQueue;
    private long queueOffset;
    //事务ID
    private String transactionId;
    private String offsetMsgId;
    private String regionId;
    //是否需要跟踪
    private boolean traceOn = true;

    public SendResult() {
    }
    //省略了构造函数、getter和setter等一系列方法
}

在这个demo中,我们是将消息内容和消息状态一并打印到控制台。

消息发送源码分析

在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。

屏幕快照 2018-03-31 11.51.36.png

首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。

MQProducer.png

在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:

  • 根据服务状态决定接下来的动作
  • 对于CREATE_JUST状态
    • 设置服务状态
    • 检查配置
    • 获取或创建MQClientInstance实例
    • 将生产者注册到指定的producerGroup,即producerTable这个数据结构中,是一个map
    • 填充topicPublishInfoTable数据结构
    • 启动生产者
  • 对于RUNNING、START_FAILED和SHUTDOWN_ALREADY,抛出异常
 public void start(final boolean startFactory) throws MQClientException {
        //根据当前的服务状态决定接下来的动作
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                //创建一个客户端工厂
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                //将生产者注册到指定producer group
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                //填充topicPublishInfoTable
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        //给该producer连接的所有broker发送心跳消息
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

顺着mQClientFactory.start()往下跟,可以进一步了解生产者的细节,主要步骤有:

  • 建立请求响应通道
  • 启动各种定时任务,例如:每隔2分钟向name server拉取一次broker集群的地址,这意味着如果某个broker宕机了,生产者在这两分钟之内的消息是投递失败的;定期从name server拉取topic等路由信息;定期清理失效的broker以及向broker发送心跳消息等。
  • 启动拉服务、负载均衡服务、推服务等服务,这三个服务跟消费者有关。这里设计上不太明了,将消费者和生产者的启动逻辑放在一起了。看pullMessageService和rebalanceService和初始化,它们是根据MQClientInstance初始化的,而MQClientInstance又是根据ClientConfig来配置的。
  public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。

屏幕快照 2018-03-31 12.26.32.png

这里我们只看最简单的send(Message message)方法,最终在DefaultMQProducerImpl中实现:

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //确认生产者状态正常
        this.makeSureStateOK();
        //检查消息的合法性
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //获取消息的目的地:Topic信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //计算出消息的投递次数,如果是同步投递,则是1+重试次数,如果不是同步投递,则只需要投递一次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            //一个broker集群有不同的broker节点,lastBrokerName记录了上次投递的broker节点,每个broker节点
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //选择一个要发送的消息队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //投递消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        //根据消息发送模式,对消息发送结果做不同的处理
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

发送消息的主要过程如下:

  • 首先检查生产者和消息的合法性
  • 然后获取消息发送的信息,该信息存放在TopicPublishInfo对象中:
public class TopicPublishInfo {
    //是否顺序消息
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    //维护该topic下用于的消息队列列表
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    //计算下一次该投递的队列,这里应用ThreadLocal,即使是同一台机器中,每个producer实例都有自己的队列
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private TopicRouteData topicRouteData;

    //省略了getter和setter方法
    
    //选择指定lastBrokerName上的下一个mq
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }

    //选择当前broker节点的下一个mq
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
}
  • 选择要发送给该topic下的那个MessageQueue,选择的逻辑分两种情况:(1)默认情况,在上次投递的broker节点上,轮询到下一个message queue来发送;(2)sendLatencyFaultEnable这个值设置为true的时候,这块没太看懂。
  • 投递消息
  • 根据消息队列运行模式,针对投递结果做不同的处理。

二、消息消费

消费者Demo

消费者里有个属性需要看下:

  • consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;consumerGroup中的实例还可以实现负载均衡和容灾。PS:处于同一个consumerGroup里的consumer实例一定是订阅了同一个topic。
  • nameServer的地址:name server地址,用于获取broker、topic信息

消费者Demo里做了以下几个事情:

  • 设置配置属性
  • 设置订阅的topic,可以指定tag
  • 设置第一次启动的时候,从message queue的哪里开始消费
  • 设置消息处理器
  • 启动消费者
package com.javadu.chapter8rocketmq.message;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 作用:
 * User: duqi
 * Date: 2018/3/29
 * Time: 14:00
 */
@Component
public class ConsumerDemo {

    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.consumerGroup}")
    private String consumerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "TagA");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        //输出消息内容
                        System.out.println("messageExt: " + messageExt);

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        //输出消息内容
                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //稍后再试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者源码分析

前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。

MQConsumer.png

在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:

    /**
     * Subscribe a topic to consuming subscription.
     *
     * @param topic topic to subscribe.
     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
     * if null or * expression,meaning subscribe all
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
    }

第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:

    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            //构建包含订阅信息的对象,并放入负载平衡组件维护的map中,以topic为key
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                topic, subExpression);
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            //如果已经跟broker集群建立连接,则给所有的broker节点发送心跳消息
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值

  • CONSUME_FROM_LAST_OFFSET,默认值,表示从上次停止时的地方开始消费
  • CONSUME_FROM_FIRST_OFFSET,从队列的头部开始消费
  • CONSUME_FROM_TIMESTAMP,从指定的时间点开始消费

ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):

    /**
     * Register a callback to execute on message arrival for concurrent consuming.
     *
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
        this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:

  • 检查配置
  • 将订阅信息拷贝到负载均衡组件(rebalanceImpl)中;
  • 负载均衡组件的几个属性的设置
  • 处理不同消息模式(集群模式或广播模式)的配置
  • 处理顺序消费和并发消费的不同配置
  • 将消费者信息和consumer group注册到MQ客户端实例的consumerTable中
  • 启动消费者客户端

参考资料

  1. 分布式开放消息系统(RocketMQ)的原理与实践
  2. 买好车提供的rocketmq-spring-boot-starter
  3. Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏容器云生态

Docker1.12尝试

前言:在docker1.12中默认增加了swarm mode 编排功能,并且官方支持更多的插件来进行docker的网路和外接存储插件,不过目前测试swarm m...

494100
来自专栏白驹过隙

Socket编程回顾,一个最简单服务器程序

23530
来自专栏跟着阿笨一起玩NET

Log4Net日志记录两种方式

     log4net库是Apache log4j框架在Microsoft .NET平台的实现,是一个帮助程序员将日志信息输出到各种目标(控制台、文件、数据库...

50020
来自专栏匠心独运的博客

消息中间件—RocketMQ消息发送

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢? 大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息...

44230
来自专栏FreeBuf

一款轻量级Web漏洞教学演示系统(DSVW)

Damn Small Vulnerable Web (DSVW) 是使用 Python 语言开发的 Web应用漏洞 的演练系统。其系统只有一个 python 的...

306100
来自专栏漫漫全栈路

ASP.NET MVC学习笔记07数据表和模型添加新字段

给电影表和模型添加新字段 在本节中,您将使用Entity Framework Code First来实现模型类上的操作。从而使得这 些操作和变更,可以应用到数据...

34630
来自专栏乐沙弥的世界

Oracle Data Guard 重要配置参数

    Oracle Data Guard主要是通过为生产数据库提供一个或多个备用数据库(是产生数据库的一个副本),以保证在主库不可用或异常时数据不丢失并通过备...

16520
来自专栏恰童鞋骚年

.NET Core微服务之基于Consul实现服务治理(续)

上一篇发布之后,很多人点赞和评论,不胜惶恐,这一篇把上一篇没有弄到的东西补一下,也算是给各位前来询问的朋友的一些回复吧。

18350
来自专栏琯琯博客

开发 Composer 包详细步骤

一、GitHub 创建一个名 uploadfile 新仓库,并克隆至本地。 二、初始化项目,生成composer.json文件 2.1 步骤 2.2 步骤解释...

806120
来自专栏醉梦轩

Ubuntu安装Proxychains

1.4K30

扫码关注云+社区

领取腾讯云代金券