首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ详解(13)——RocketMQ的消息模式

RocketMQ详解(13)——RocketMQ的消息模式

作者头像
张申傲
发布2020-09-03 11:04:19
2K0
发布2020-09-03 11:04:19
举报
文章被收录于专栏:漫漫架构路漫漫架构路

RocketMQ详解(13)——RocketMQ的消息模式

一. RocketMQ的消息模式

  1. 在RocketMQ中,可以理解为没有ActiveMQ的createQueue()和createTopic()的用法,也就是并没有P2P和Pub/Sub类似的概念。RocketMQ不遵循JMS规范,而是使用了一套自定义的机制。可以理解为RocketMQ都是基于Pub/Sub发布订阅模式的,在此基础上提供了集群消息和广播消息两种消息模式,可通过消费端方法consumer.setMessageModel()进行设置。
    1. 集群消息——MessageModel.CLUSTERING 这方方式可以实现类似ActiveMQ负载均衡客户端的功能,同一个ConsumerGroup下的所有Consumer已负载均衡的方式消费消息。比较特殊的是,这种方式可以支持生产端先发送消息到Broker,消费端再订阅主题进行消费,比较灵活。RocketMQ默认为该模式。
    2. 广播消息——MessageModel.BROADCASTING 在这种模式下,生产端发送到Topic下的消息,会被订阅了该Topic的所有Consumer消费,即使它们处于同一个ConsumerGroup。
  2. 在RocketMQ中,有一个很重要的概念——GroupName。无论是Producer端还是Consumer端,都必须指定一个GroupName,这个组名称需要由应用来保证唯一性。同一个ProducerGroup下的所有Producer发送用一类消息,且发送逻辑一直。Consumer同理。
  3. Topic代表消息发送和订阅的主题,是一个逻辑上的概念,Topic并不实际存储消息。每个Topic都会维护一些MessageQueue(默认4个),这个MessageQueue则是物理上的概念,直接存储消息。

下面分别演示两种消息模式。

二. Producer端程序

使用DefaultMQProducer,发送8条消息:

package william.rmq.producer.quickstart;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import william.rmq.common.constant.RocketMQConstant;
import william.rmq.producer.common.CommonSendCallback;
import javax.annotation.PostConstruct;

/**
 * @Auther: ZhangShenao
 * @Date: 2018/9/7 10:58
 * @Description:RocketMQ消息生产者
 */
@Service
@Slf4j
public class MessageProducer {
    @Value("${spring.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private static final DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");

    @PostConstruct
    public void start(){
        try {
            producer.setNamesrvAddr(namesrvAddr);
            producer.setRetryTimesWhenSendFailed(RocketMQConstant.MAX_RETRY_TIMES);
            producer.start();

            log.info("Message Producer Start...");
            System.err.println("Message Producer Start...");
        }catch (Exception e){
            log.error("Message Producer Start Error!!",e);
        }

        String message = "Message-";
        String topic = RocketMQConstant.TEST_TOPIC_NAME;
        String tags = "Tags";
        String keys = "Keys-";

        for (int i = 1;i <= 8;i++){
            sendMessage(message + i,topic,tags,keys + i);
        }
    }

    public void sendMessage(String data, String topic, String tags, String keys) {
        try {
            byte[] messageBody = data.getBytes(RemotingHelper.DEFAULT_CHARSET);

            Message mqMsg = new Message(topic, tags, keys, messageBody);

            producer.send(mqMsg, new CommonSendCallback());
        } catch (Exception e) {
            log.error("Message Producer: Send Message Error ", e);
        }

    }
}

三. Consumer端程序

package william.rmq.consumer.quickstart;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import william.rmq.common.constant.RocketMQConstant;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * @Auther: ZhangShenao
 * @Date: 2018/9/7 11:06
 * @Description:RocketMQ消息消费者
 */
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
    @Value("${spring.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");


    @PostConstruct
    public void start() {
        try {
            consumer.setNamesrvAddr(namesrvAddr);

            //从消息队列头部开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

            //设置集群消费模式
            consumer.setMessageModel(MessageModel.CLUSTERING);

            //设置消费超时时间(分钟)
            consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);

            //订阅主题
            consumer.subscribe(RocketMQConstant.TEST_TOPIC_NAME, "*");

            //注册消息监听器
            consumer.registerMessageListener(this);

            //设置批量消费最大消息数,这里设置为逐条消费
            consumer.setConsumeMessageBatchMaxSize(1);

            //启动消费端
            consumer.start();

            log.info("Message Consumer Start...");
            System.err.println("Message Consumer Start...");
        } catch (MQClientException e) {
            log.error("Message Consumer Start Error!!",e);
        }

    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(msgs)) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        MessageExt message = msgs.get(0);
        try {
            String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
            System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
                    message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            log.error("Consume Message Error!!", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

}

先启动2个Consumer程序,再启动单个Producer程序,两个Consumer端控制台输出如下:

Consumer1:

Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30006,topic: DefaultCluster,tags: Tags,messageBody: Message-7
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30007,topic: DefaultCluster,tags: Tags,messageBody: Message-8
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70002,topic: DefaultCluster,tags: Tags,messageBody: Message-1
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30005,topic: DefaultCluster,tags: Tags,messageBody: Message-6

Consumer2:

Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D80003,topic: DefaultCluster,tags: Tags,messageBody: Message-4
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70000,topic: DefaultCluster,tags: Tags,messageBody: Message-2
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89D70001,topic: DefaultCluster,tags: Tags,messageBody: Message-3
Message Consumer: Handle New Message: messageId: 0A0E096CA41418B4AAC259EE89E30004,topic: DefaultCluster,tags: Tags,messageBody: Message-5

可以看到,使用集群消息模式后,两个Consumer负载均衡消费了8条消息。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-09-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ详解(13)——RocketMQ的消息模式
    • 一. RocketMQ的消息模式
      • 二. Producer端程序
        • 三. Consumer端程序
        相关产品与服务
        负载均衡
        负载均衡(Cloud Load Balancer,CLB)提供安全快捷的流量分发服务,访问流量经由 CLB 可以自动分配到云中的多台后端服务器上,扩展系统的服务能力并消除单点故障。负载均衡支持亿级连接和千万级并发,可轻松应对大流量访问,满足业务需求。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档