消息队列是一种异步通信(Asynchronous Communication)的服务或组件。
消息队列是解决高并发问题的关键技术之一。
消息队列主要解决了分布式系统中的三大核心问题:
在需要解耦、异步、削峰三大特性同时发挥作用的场景下,消息队列没有完美的平替技术。 MQ(如 Kafka, RabbitMQ, RocketMQ)是最佳选择。
这里以dockerhub上 RocketMQ 5.3.2 版本的镜像为例,介绍部署过程。
docker pull apache/rocketmq:5.3.2RocketMQ 中有多个服务,需要创建多个容器,创建 docker 网络便于容器间相互通信。
docker network create rocketmq# 启动 NameServer
docker run \
-d # 后台运行容器
--name rmqnamesrv # 给容器起固定名(方便后续通信和管理)
-p 9876:9876 # 端口映射:宿主机 9876 端口 → 容器 9876 端口(NameServer 默认端口)
--network rocketmq # 加入自定义网络(让 Broker 能通过名访问)
apache/rocketmq:5.3.2 # 用哪个镜像
sh mqnamesrv # 容器启动后执行的命令(启动 NameServer 的脚本)
# 验证 NameServer 是否启动成功
docker logs -f rmqnamesrv看到 'The Name Server boot success..', 表示NameServer 已成功启动。
NameServer 成功启动后,我们启动 Broker 和 Proxy。
# 配置 Broker 的 IP 地址
echo "brokerIP1=127.0.0.1" > broker.conf
# 启动 Broker 和 Proxy
docker run -d --name rmqbroker --net rocketmq -p 10912:10912 -p 10911:10911 -p 10909:10909 -p 8081:8080 -p 8083:8081 -e "NAMESRV_ADDR=rmqnamesrv:9876" -v %cd%\broker.conf:/home/rocketmq/rocketmq-5.3.2/conf/broker.conf apache/rocketmq:5.3.2 sh mqbroker --enable-proxy -c /home/rocketmq/rocketmq-5.3.2/conf/broker.conf
# 验证 Broker 是否启动成功
docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"Broke在 RocketMQ 5.x 里会监听 10912、10911、10909 这三个端口,分别监听 从 Broker 4.x 客户端 Proxy的消息 Proxy 在8080监听5.x 客户端(用 HTTP 协议)
Proxy 在8081监听 5.x 客户端(默认用 gRPC 协议)
每个组件(Broker/Proxy)都有 “官方预设的默认端口”,-p映射只是 “把容器内的预设端口暴露到宿主机”,端口的功能早已被组件本身的设计固定死,和映射顺序无关。
所有 MQ 的组件,都能对应到 RocketMQ 三单元的 “核心职责”(路由、存储、接入),只是组合方式不同
比如 RocketMQ 是 “三组件各管一责”,Kafka 是 “存储组件(Broker)+ 路由组件(ZooKeeper/Controller)”,RabbitMQ 是 “Broker 一组件管路由 + 存储”,但最终都是为了完成 “消息从生产者到消费者” 的传递。
在pom.xml文件中添加以下依赖引入Java依赖库,将rocketmq-client-java-version替换成最新的版本.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java-version}</version>
</dependency> MQConfig (MQ的基本配置)
produceer(生产者)
consumer(消费者)
message(生产者消费者同消息队列进行信息传递的载体)
rocketmq.endpoint=${ROCKETMQ_ENDPOINT:localhost:8081}
rocketmq.topic.asset-enrichment=asset-enrichment-topic
@Configuration
public class RocketMQConfig {
private static final Logger log = LoggerFactory.getLogger(RocketMQConfig.class);
@Value("${rocketmq.endpoint}")
private String endpoint; // RocketMQ连接端点(Proxy地址)
@Value("${rocketmq.topic.product-enrichment}") // 主题名改为商品相关
private String topic;
@Value("${rocketmq.consumer.group}")
private String consumerGroup; // 消费者组名
/**
* 创建生产者(Producer)
* 作用:向指定主题发送商品相关消息
*/
@Bean(destroyMethod = "close") // 容器销毁时自动关闭生产者,释放资源
public Producer rocketMQProducer() throws ClientException {
log.info("初始化商品消息生产者 - 端点: {}, 主题: {}", endpoint, topic);
// 加载RocketMQ客户端服务
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 配置连接参数(端点地址)
ClientConfiguration configuration = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
// 构建生产者(绑定主题,确保只能向指定主题发送消息)
Producer producer = provider.newProducerBuilder()
.setTopics(topic) // 限制生产者可发送的主题
.setClientConfiguration(configuration)
.build();
log.info("商品消息生产者初始化成功");
return producer;
}
/**
* 创建消费者(PushConsumer)
* 作用:从指定主题订阅并消费商品相关消息
*/
@Bean(destroyMethod = "close") // 容器销毁时自动关闭消费者
public PushConsumer rocketMQConsumer(
ProductEnrichmentConsumer productEnrichmentConsumer) throws ClientException {
log.info("初始化商品消息消费者 - 端点: {}, 主题: {}, 消费组: {}",
endpoint, topic, consumerGroup);
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration configuration = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
// 订阅规则:匹配所有Tag(*表示不筛选)
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
// 构建消费者(绑定消费组、主题、消息监听器)
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(configuration)
.setConsumerGroup(consumerGroup) // 消费组标识
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 订阅主题
.setMessageListener(productEnrichmentConsumer) // 消息处理逻辑
.build();
log.info("商品消息消费者初始化成功");
return pushConsumer;
}
}/**
* 商品消息载体
* 作用:定义生产者和消费者之间传递的商品相关数据结构
*/
@Data // 自动生成getter/setter
@Builder // 支持链式构建消息对象
@NoArgsConstructor
@AllArgsConstructor
public class ProductEnrichmentMessage implements Serializable { // 实现序列化,支持网络传输
private static final long serialVersionUID = 1L; // 序列化版本号,确保反序列化兼容
/**
* 商品ID(核心业务标识,用于定位需要处理的商品)
*/
private Long productId;
/**
* 消息唯一ID(用于追踪消息全生命周期)
*/
private String messageId;
/**
* 发送时间戳(记录消息产生时间)
*/
private Long timestamp;
/**
* 重试次数(记录消息被重试消费的次数,避免无限重试)
*/
private Integer retryCount;
}@Service
@RequiredArgsConstructor // 自动注入final字段的构造函数
public class ProductEnrichmentProducer {
private static final Logger log = LoggerFactory.getLogger(ProductEnrichmentProducer.class);
// 注入RocketMQ生产者(由RocketMQConfig配置类创建)
private final Producer rocketMQProducer;
// 用于JSON序列化(将消息对象转为字符串)
private final ObjectMapper objectMapper;
@Value("${rocketmq.topic.product-enrichment}")
private String topic; // 目标主题
/**
* 发送商品数据补全消息
* @param productId 商品ID(需要补全数据的商品)
*/
public void sendEnrichmentMessage(Long productId) {
try {
log.info("准备发送商品数据补全消息 - 商品ID: {}", productId);
// 1. 构建消息对象(使用Builder模式,代码更简洁)
ProductEnrichmentMessage message = ProductEnrichmentMessage.builder()
.productId(productId)
.messageId(UUID.randomUUID().toString()) // 生成唯一消息ID
.timestamp(System.currentTimeMillis()) // 记录发送时间
.retryCount(0) // 初始重试次数为0
.build();
// 2. 将消息对象序列化为JSON字符串(便于网络传输)
String messageBody = objectMapper.writeValueAsString(message);
// 3. 创建RocketMQ原生消息(指定主题、标签、消息体等)
ClientServiceProvider provider = ClientServiceProvider.loadService();
Message rocketMessage = provider.newMessageBuilder()
.setTopic(topic) // 发送到目标主题
.setKeys("product-" + productId) // 消息键(用于消息查询和过滤)
.setTag("enrichment") // 消息标签(用于消费者按标签筛选消息)
.setBody(messageBody.getBytes(StandardCharsets.UTF_8)) // 消息体(字节数组)
.build();
// 4. 发送消息(同步发送,确保消息成功投递)
SendReceipt receipt = rocketMQProducer.send(rocketMessage);
log.info("商品消息发送成功 - 商品ID: {}, 消息ID: {}, 回执ID: {}",
productId, message.getMessageId(), receipt.getMessageId());
} catch (ClientException e) {
// MQ客户端异常(如连接失败、主题不存在)
log.error("发送商品消息失败(MQ客户端异常) - 商品ID: {}", productId, e);
} catch (JsonProcessingException e) {
// JSON序列化异常(消息对象格式错误)
log.error("发送商品消息失败(JSON序列化异常) - 商品ID: {}", productId, e);
}
}
}@Service
@RequiredArgsConstructor // 自动注入依赖
public class ProductEnrichmentConsumer implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(ProductEnrichmentConsumer.class);
// 商品数据补全业务服务(实际处理业务逻辑)
private final ProductEnrichmentService productEnrichmentService;
// 用于JSON反序列化(将消息体转为Java对象)
private final ObjectMapper objectMapper;
/**
* 消费消息的核心方法
* @param messageView 接收到的消息对象
* @return 消费结果(SUCCESS表示成功,FAILURE表示失败需重试)
*/
@Override
public ConsumeResult consume(MessageView messageView) {
try {
// 1. 解析消息体(从字节数组转为JSON字符串)
String messageBody = StandardCharsets.UTF_8.decode(messageView.getBody()).toString();
log.info("收到商品数据补全消息 - MQ消息ID: {}, 消息内容: {}",
messageView.getMessageId(), messageBody);
// 2. 将JSON字符串反序列化为消息对象
ProductEnrichmentMessage message = objectMapper.readValue(
messageBody, ProductEnrichmentMessage.class);
Long productId = message.getProductId();
if (productId == null) {
log.error("商品ID为空,消息处理失败 - MQ消息ID: {}", messageView.getMessageId());
return ConsumeResult.FAILURE; // 失败,触发重试
}
// 3. 调用业务服务处理商品数据补全
log.info("开始处理商品数据补全 - 商品ID: {}, 消息ID: {}",
productId, message.getMessageId());
productEnrichmentService.enrichProductData(productId); // 实际业务逻辑
log.info("商品数据补全处理成功 - 商品ID: {}, 消息ID: {}",
productId, message.getMessageId());
return ConsumeResult.SUCCESS; // 成功,MQ标记消息为已消费
} catch (Exception e) {
// 处理异常(如业务逻辑失败、反序列化失败)
log.error("处理商品消息失败 - MQ消息ID: {}", messageView.getMessageId(), e);
return ConsumeResult.FAILURE; // 失败,MQ将重试发送消息
}
}
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。