前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >手搭手Springboot3整合RocketMQ2.3

手搭手Springboot3整合RocketMQ2.3

原创
作者头像
QGS
发布2024-04-22 00:01:43
3220
发布2024-04-22 00:01:43
举报
文章被收录于专栏:QGS星球QGS星球

环境介绍

技术栈

springboot+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

rocketmq

4.9.4

RocketMQ 基本概念

消息模型Message Model

RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个 Topic 的消息,每个Topic 的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个 Consumer 实例构成。

消息生产者Producer

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。

消息消费者Consumer

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

主题Topic

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ 进行消息订阅的基本单位。

代理服务器Broker Server

消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务Name Server

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交

Pom.xml加入依赖

代码语言:java
复制
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
 <version>2.3.0</version>
</dependency>
<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>2.0.32</version>
</dependency>

application.yml配置

代码语言:java
复制
rocketmq:
 name-server: 192.168.68.133:9876
 producer:
 #生产者组名,一个应用里面必须唯一
 group: test-producer
 #消息发送的超时时间 默认3000ms
 send-message-timeout: 3000
 #消息达到4096字节的时候,消息就会被压缩。默认 4096
 compress-message-body-threshold: 4096
 #最大的消息限制,默认为128K
 max-message-size: 4194304
 #同步消息发送失败重试次数
 retry-times-when-send-failed: 2
 #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
 retry-next-server: true
 #异步消息发送失败重试的次数
 retry-times-when-send-async-failed: 2

消费者监听器

报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队,根据application.yml的配置

@Component @RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot") public class RocketListener implements RocketMQListener<MessageExt> { /** * onMessage 消费者方法 * @param messages 消息内容 */ @Override public void onMessage(MessageExt messages) { /不报错就是签收信息, //报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 System.out.println("接收到消息:"+new String(messages.getBody()));

} }

发送同步消息

生产者

代码语言:java
复制
@Test
void sendMsg() {
 /**
 * 发送同步消息
 * destination 目的地-主题
 * payload 消息
 */
 rocketMQTemplate.syncSend("TopicTest", "同步消息");
}

消费者

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage 消费者方法
 * @param messages 消息内容
 */
 @Override
 public void onMessage(MessageExt messages) {
 //不报错就是签收信息,
 //报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
 System.out.println("接收到消息:"+new String(messages.getBody()));
 }
}

发送异步消息

生产者

代码语言:java
复制
@Test
void asyncTest() {
 /**
 * 发送异步消息
 * destination 目的地-主题
 * payload 消息
 */
 rocketMQTemplate.asyncSend("TopicTest", "异步消息", new SendCallback() {
 @Override
 public void onSuccess(SendResult sendResult) {
 System.out.println("发送成功");
 }

 @Override
 public void onException(Throwable throwable) {
 System.out.println("发送失败");
 }
 });
}

消费者

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage 消费者方法
 * @param messages 消息内容
 */
 @Override
 public void onMessage(MessageExt messages) {
 //不报错就是签收信息,
 //报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
 System.out.println("接收到消息:"+new String(messages.getBody()));
 }
}

发送延时消息

生产者

代码语言:java
复制
@Test
void delayTest() {
 /**
 * 发送延时消息
 * destination 目的地-主题
 * payload 消息
 * timestamp 连接超时
 * delayLevel 延时级别
 */
 Message<String> msg = MessageBuilder.withPayload("延时消息").build();
 rocketMQTemplate.syncSend("TopicTest", msg, 3000, 3);
}

消费者

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage 消费者方法
 * @param messages 消息内容
 */
 @Override
 public void onMessage(MessageExt messages) {
 //不报错就是签收信息,
 //报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
 System.out.println("接收到消息:"+new String(messages.getBody()));
 }
}

发送单向消息

生产者

代码语言:java
复制
@Test
void OneWayTest() {
 /**
 * 发送单向消息
 * destination 目的地-主题
 * payload 消息
 */
 rocketMQTemplate.sendOneWay("TopicTest", "单向消息");
}

消费者

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "TopicTest",consumerGroup = "ConsumerGroup-springboot")
public class RocketListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage 消费者方法
 * @param messages 消息内容
 */
 @Override
 public void onMessage(MessageExt messages) {
 //不报错就是签收信息,
 //报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
 System.out.println("接收到消息:"+new String(messages.getBody()));
 }
}

发送顺序消息

顺序消息 生产者需要将一组消息都发送到同一个队列 ,消费者需要单线程消费

生产者

生产者需要将一组消息都发送到同一个队列

代码语言:java
复制
List<MessageM> messageMs = Arrays.asList(
 new MessageM("sn0001", 1, "下单"),
 new MessageM("sn0001", 1, "付款"),
 new MessageM("sn0001", 1, "配送"),
 new MessageM("sn0002", 2, "下单"),
 new MessageM("sn0002", 2, "付款"),
 new MessageM("sn0002", 2, "配送")
);


@Test
void orderlyTest() {
 /**

 * destination 目的地-主题
 * payload 消息




 */
 for (MessageM messageM : messageMs) {
 rocketMQTemplate.syncSendOrderly("orderlyTest", JSON.toJSON(messageM), messageM.getSn());
 }
}

消费者

CONCURRENTLY 同时

ORDERLY有序

消费者需要单线程消费

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "orderlyTest",consumerGroup = "orderly",consumeMode = ConsumeMode.ORDERLY)
public class orderlyListener implements RocketMQListener<MessageExt> {

 @Override
 public void onMessage(MessageExt messageExt) {
 MessageM messageM = JSON.parseObject(new String(messageExt.getBody()), MessageM.class);
 System.out.println(messageM);
 }
}

发送带标签tag

生产者

代码语言:java
复制
@Test
void ProducerTagTest(){
 rocketMQTemplate.syncSend("TagMQ:tagA","带tagA的消息");
 rocketMQTemplate.syncSend("TagMQ:tagB","带tagB的消息");
}

消费者

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "TagMQ",
 consumerGroup = "TagMQGroup",
 selectorType = SelectorType.TAG, //tag过滤模式
 selectorExpression = "tagA || tagB"

)
public class MsgListenerTag implements RocketMQListener<MessageExt> {

 @Override
 public void onMessage(MessageExt messageExt) {
 System.out.println(new String( messageExt.getBody()));
 }
}

发送带Key消息

Key带在消息头中

生产者

代码语言:java
复制
@Test
void keyTest(){
 String Key = UUID.randomUUID().toString();
 Message<String> msg = MessageBuilder
 .withPayload("带key消息").
 setHeader(RocketMQHeaders.KEYS, Key)
 .build();
 /**
 * 带Key消息
 */
 rocketMQTemplate.syncSend("ketTopic",msg);
}

消费者

代码语言:java
复制
@Component
@RocketMQMessageListener(topic = "ketTopic",consumerGroup = "ketConsumerGroup-springboot")
public class keyMQListener implements RocketMQListener<MessageExt> {

 /**
 * onMessage 消费者方法
 * @param messages 消息内容
 */
 @Override
 public void onMessage(MessageExt messages) {
 //不报错就是签收信息,
 //报错就是重试,重试三次后还是报错,就代表消费失败,会重新入队 (根据yml配置)
 System.out.println("接收到消息:"+new String(messages.getBody()));
 System.out.println("key:"+messages.getKeys());
 }
}

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ 基本概念
  • 消费者监听器
  • 发送同步消息
  • 发送异步消息
  • 发送延时消息
  • 发送单向消息
  • 发送顺序消息
  • 发送带标签tag
  • 发送带Key消息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档