首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >消息队列

消息队列

原创
作者头像
用户11708420
发布2025-11-11 11:11:21
发布2025-11-11 11:11:21
1840
举报

1. 什么是消息队列?

消息队列是一种异步通信(Asynchronous Communication)的服务或组件。

  • 它充当了消息的中介:发送方(生产者,Producer)将消息发送到队列中,接收方(消费者,Consumer)从队列中获取并处理消息。
  • 核心特点是解耦:生产者和消费者不需要知道彼此的存在,它们只与消息队列交互。

消息队列是解决高并发问题的关键技术之一

2.解决了什么问题?

消息队列主要解决了分布式系统中的三大核心问题:

  • 解耦(Decoupling)
    • 问题:系统间存在强依赖关系。
    • 解决:将消息的发送和接收分离开。生产者只需要将消息发送到 MQ,而不需要关心谁来消费,从而降低了服务间的依赖性。
  • 异步(Asynchrony)
    • 问题:某些操作耗时长,同步等待会阻塞主流程,影响用户体验。
    • 解决:将不需要立即得到结果的操作(如发送邮件、生成订单报表)放入 MQ,主流程快速响应,然后由消费者后台慢慢处理。这能极大地提升系统的吞吐量和响应速度
  • 削峰(Throttling/Peak Shaving)
    • 问题:系统在短时间内面临瞬时高并发流量(如秒杀活动)。
    • 解决:将瞬时的大量请求全部积压在 MQ 中。消费者可以按照自身最大处理能力匀速地从 MQ 中取出请求进行处理。这保护了后端服务,防止因流量过载而宕机。

在需要解耦、异步、削峰三大特性同时发挥作用的场景下,消息队列没有完美的平替技术。 MQ(如 Kafka, RabbitMQ, RocketMQ)是最佳选择

RocketMQ 的基本单元

1.NameServer

  • 作为 “路由注册表”:管理所有 Broker 的地址信息;
  • 作为 “轻量级协调中心”:客户端(生产者 / 消费者)启动时,先从 NameServer 获取 Broker 地址,再与 Broker 通信。
  • 无依赖(独立启动),但 Broker / 客户端需要知道它的地址。

2.Broker

  • 真正的 “消息存储 & 转发节点”:存储生产者发送的消息,给消费者提供消息读取;
  • 管理 Topic(消息主题)、Queue(消息队列)等元数据。
  • 必须依赖 NameServer(启动时要注册地址到 NameServer)。

3.Proxy

  • RocketMQ 5.x 新增的 “客户端接入层”:统一处理生产者 / 消费者的请求(如协议转换、权限控制);
  • 解耦客户端与 Broker:客户端不再直接连 Broker,而是连 Proxy,降低架构复杂度。
  • 必须依赖 Broker(与 Broker 同进程启动,需知道 Broker 配置)。

Docker 部署 RocketMQ

1.拉取RocketMQ镜像

这里以dockerhub上 RocketMQ 5.3.2 版本的镜像为例,介绍部署过程。

代码语言:javascript
复制
docker pull apache/rocketmq:5.3.2

2.创建容器共享网络

RocketMQ 中有多个服务,需要创建多个容器,创建 docker 网络便于容器间相互通信。

代码语言:javascript
复制
docker network create rocketmq

3.启动NameServer

代码语言:javascript
复制
# 启动 NameServer
docker run \
  -d  # 后台运行容器
  --name rmqnamesrv  # 给容器起固定名(方便后续通信和管理)
  -p 9876:9876  # 端口映射:宿主机 9876 端口 → 容器 9876 端口(NameServer 默认端口)
  --network rocketmq  # 加入自定义网络(让 Broker 能通过名访问)
  apache/rocketmq:5.3.2  # 用哪个镜像
  sh mqnamesrv  # 容器启动后执行的命令(启动 NameServer 的脚本)
  
# 验证 NameServer 是否启动成功
docker logs -f rmqnamesrv

看到 'The Name Server boot success..', 表示NameServer 已成功启动。

4.启动 Broker+Proxy

NameServer 成功启动后,我们启动 Broker 和 Proxy。

  • Windows
代码语言:javascript
复制
# 配置 Broker 的 IP 地址
echo "brokerIP1=127.0.0.1" > broker.conf

# 启动 Broker 和 Proxy
docker run -d --name rmqbroker --net rocketmq -p 10912:10912 -p 10911:10911 -p 10909:10909 -p 8081:8080 -p 8083:8081 -e "NAMESRV_ADDR=rmqnamesrv:9876" -v %cd%\broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf apache/rocketmq:5.3.2 sh mqbroker --enable-proxy -c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf

# 验证 Broker 是否启动成功
docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"

Broke在 RocketMQ 5.x 里会监听 10912、10911、10909 这三个端口,分别监听 从 Broker 4.x 客户端 Proxy的消息 Proxy 在8080监听5.x 客户端(用 HTTP 协议)

Proxy 在8081监听 5.x 客户端(默认用 gRPC 协议)

每个组件(Broker/Proxy)都有 “官方预设的默认端口”,-p映射只是 “把容器内的预设端口暴露到宿主机”,端口的功能早已被组件本身的设计固定死,和映射顺序无关。

所有 MQ 的组件,都能对应到 RocketMQ 三单元的 “核心职责”(路由、存储、接入),只是组合方式不同

比如 RocketMQ 是 “三组件各管一责”,Kafka 是 “存储组件(Broker)+ 路由组件(ZooKeeper/Controller)”,RabbitMQ 是 “Broker 一组件管路由 + 存储”,但最终都是为了完成 “消息从生产者到消费者” 的传递。

以java为例 实现RocketMQ的部署

pom.xml文件中添加以下依赖引入Java依赖库,将rocketmq-client-java-version替换成最新的版本.

代码语言:txt
复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>${rocketmq-client-java-version}</version>
</dependency> 

1.需要实现的基本对象:

MQConfig (MQ的基本配置)

produceer(生产者)

consumer(消费者)

message(生产者消费者同消息队列进行信息传递的载体)

2.MQConfig的配置

rocketmq.endpoint=${ROCKETMQ_ENDPOINT:localhost:8081}

rocketmq.topic.asset-enrichment=asset-enrichment-topic

3.MQconfig

代码语言:java
复制
@Configuration
public class RocketMQConfig {
    private static final Logger log = LoggerFactory.getLogger(RocketMQConfig.class);

    @Value("${rocketmq.endpoint}")
    private String endpoint;  // RocketMQ连接端点(Proxy地址)

    @Value("${rocketmq.topic.product-enrichment}")  // 主题名改为商品相关
    private String topic;

    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;  // 消费者组名


    /**
     * 创建生产者(Producer)
     * 作用:向指定主题发送商品相关消息
     */
    @Bean(destroyMethod = "close")  // 容器销毁时自动关闭生产者,释放资源
    public Producer rocketMQProducer() throws ClientException {
        log.info("初始化商品消息生产者 - 端点: {}, 主题: {}", endpoint, topic);

        // 加载RocketMQ客户端服务
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 配置连接参数(端点地址)
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();

        // 构建生产者(绑定主题,确保只能向指定主题发送消息)
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)  // 限制生产者可发送的主题
                .setClientConfiguration(configuration)
                .build();

        log.info("商品消息生产者初始化成功");
        return producer;
    }


    /**
     * 创建消费者(PushConsumer)
     * 作用:从指定主题订阅并消费商品相关消息
     */
    @Bean(destroyMethod = "close")  // 容器销毁时自动关闭消费者
    public PushConsumer rocketMQConsumer(
            ProductEnrichmentConsumer productEnrichmentConsumer) throws ClientException {
        log.info("初始化商品消息消费者 - 端点: {}, 主题: {}, 消费组: {}",
                endpoint, topic, consumerGroup);

        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfiguration configuration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();

        // 订阅规则:匹配所有Tag(*表示不筛选)
        FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);

        // 构建消费者(绑定消费组、主题、消息监听器)
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(configuration)
                .setConsumerGroup(consumerGroup)  // 消费组标识
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))  // 订阅主题
                .setMessageListener(productEnrichmentConsumer)  // 消息处理逻辑
                .build();

        log.info("商品消息消费者初始化成功");
        return pushConsumer;
    }
}

4.message(生产者消费者同消息队列进行信息传递的载体)

代码语言:javascript
复制
/**
 * 商品消息载体
 * 作用:定义生产者和消费者之间传递的商品相关数据结构
 */
@Data  // 自动生成getter/setter
@Builder  // 支持链式构建消息对象
@NoArgsConstructor
@AllArgsConstructor
public class ProductEnrichmentMessage implements Serializable {  // 实现序列化,支持网络传输

    private static final long serialVersionUID = 1L;  // 序列化版本号,确保反序列化兼容

    /**
     * 商品ID(核心业务标识,用于定位需要处理的商品)
     */
    private Long productId;

    /**
     * 消息唯一ID(用于追踪消息全生命周期)
     */
    private String messageId;

    /**
     * 发送时间戳(记录消息产生时间)
     */
    private Long timestamp;

    /**
     * 重试次数(记录消息被重试消费的次数,避免无限重试)
     */
    private Integer retryCount;
}

5.produceer(生产者)

代码语言:javascript
复制
@Service
@RequiredArgsConstructor  // 自动注入final字段的构造函数
public class ProductEnrichmentProducer {
    private static final Logger log = LoggerFactory.getLogger(ProductEnrichmentProducer.class);

    // 注入RocketMQ生产者(由RocketMQConfig配置类创建)
    private final Producer rocketMQProducer;
    // 用于JSON序列化(将消息对象转为字符串)
    private final ObjectMapper objectMapper;

    @Value("${rocketmq.topic.product-enrichment}")
    private String topic;  // 目标主题


    /**
     * 发送商品数据补全消息
     * @param productId 商品ID(需要补全数据的商品)
     */
    public void sendEnrichmentMessage(Long productId) {
        try {
            log.info("准备发送商品数据补全消息 - 商品ID: {}", productId);

            // 1. 构建消息对象(使用Builder模式,代码更简洁)
            ProductEnrichmentMessage message = ProductEnrichmentMessage.builder()
                    .productId(productId)
                    .messageId(UUID.randomUUID().toString())  // 生成唯一消息ID
                    .timestamp(System.currentTimeMillis())  // 记录发送时间
                    .retryCount(0)  // 初始重试次数为0
                    .build();

            // 2. 将消息对象序列化为JSON字符串(便于网络传输)
            String messageBody = objectMapper.writeValueAsString(message);

            // 3. 创建RocketMQ原生消息(指定主题、标签、消息体等)
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            Message rocketMessage = provider.newMessageBuilder()
                    .setTopic(topic)  // 发送到目标主题
                    .setKeys("product-" + productId)  // 消息键(用于消息查询和过滤)
                    .setTag("enrichment")  // 消息标签(用于消费者按标签筛选消息)
                    .setBody(messageBody.getBytes(StandardCharsets.UTF_8))  // 消息体(字节数组)
                    .build();

            // 4. 发送消息(同步发送,确保消息成功投递)
            SendReceipt receipt = rocketMQProducer.send(rocketMessage);

            log.info("商品消息发送成功 - 商品ID: {}, 消息ID: {}, 回执ID: {}",
                    productId, message.getMessageId(), receipt.getMessageId());

        } catch (ClientException e) {
            // MQ客户端异常(如连接失败、主题不存在)
            log.error("发送商品消息失败(MQ客户端异常) - 商品ID: {}", productId, e);
        } catch (JsonProcessingException e) {
            // JSON序列化异常(消息对象格式错误)
            log.error("发送商品消息失败(JSON序列化异常) - 商品ID: {}", productId, e);
        }
    }
}

6.consumer(消费者)

代码语言:javascript
复制
@Service
@RequiredArgsConstructor  // 自动注入依赖
public class ProductEnrichmentConsumer implements MessageListener {
    private static final Logger log = LoggerFactory.getLogger(ProductEnrichmentConsumer.class);

    // 商品数据补全业务服务(实际处理业务逻辑)
    private final ProductEnrichmentService productEnrichmentService;
    // 用于JSON反序列化(将消息体转为Java对象)
    private final ObjectMapper objectMapper;


    /**
     * 消费消息的核心方法
     * @param messageView 接收到的消息对象
     * @return 消费结果(SUCCESS表示成功,FAILURE表示失败需重试)
     */
    @Override
    public ConsumeResult consume(MessageView messageView) {
        try {
            // 1. 解析消息体(从字节数组转为JSON字符串)
            String messageBody = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
            log.info("收到商品数据补全消息 - MQ消息ID: {}, 消息内容: {}",
                    messageView.getMessageId(), messageBody);

            // 2. 将JSON字符串反序列化为消息对象
            ProductEnrichmentMessage message = objectMapper.readValue(
                    messageBody, ProductEnrichmentMessage.class);

            Long productId = message.getProductId();
            if (productId == null) {
                log.error("商品ID为空,消息处理失败 - MQ消息ID: {}", messageView.getMessageId());
                return ConsumeResult.FAILURE;  // 失败,触发重试
            }

            // 3. 调用业务服务处理商品数据补全
            log.info("开始处理商品数据补全 - 商品ID: {}, 消息ID: {}",
                    productId, message.getMessageId());
            productEnrichmentService.enrichProductData(productId);  // 实际业务逻辑

            log.info("商品数据补全处理成功 - 商品ID: {}, 消息ID: {}",
                    productId, message.getMessageId());
            return ConsumeResult.SUCCESS;  // 成功,MQ标记消息为已消费

        } catch (Exception e) {
            // 处理异常(如业务逻辑失败、反序列化失败)
            log.error("处理商品消息失败 - MQ消息ID: {}", messageView.getMessageId(), e);
            return ConsumeResult.FAILURE;  // 失败,MQ将重试发送消息
        }
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 什么是消息队列?
  • RocketMQ 的基本单元
    • 1.NameServer
    • 2.Broker
    • 3.Proxy
  • Docker 部署 RocketMQ
    • 1.拉取RocketMQ镜像
    • 2.创建容器共享网络
    • 3.启动NameServer
    • 4.启动 Broker+Proxy
  • 以java为例 实现RocketMQ的部署
    • 1.需要实现的基本对象:
    • 2.MQConfig的配置
    • 3.MQconfig
    • 4.message(生产者消费者同消息队列进行信息传递的载体)
    • 5.produceer(生产者)
    • 6.consumer(消费者)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档