发送与接收延迟消息

最近更新时间:2023-04-07 10:13:05

我的收藏

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现定时消息收发的操作过程。

前提条件

下载 Demo或者前往GitHub 项目

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入相关依赖,以 maven 工程为例,在 pom.xml 添加以下依赖:
说明
依赖版本要求 ≥ 4.9.3, 当前建议为4.9.4
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.4</version>
</dependency>

步骤2:生产消息

创建消息生产者

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();

参数
说明
groupName
生产者组名称,建议使用对应的topic名字
nameserver
集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img



发送消息

固定延迟级别的消息

int totalMessagesToSend = 5;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message(TOPIC_NAME, ("Hello scheduled message " + i).getBytes());
// 设置消息延迟等级
message.setDelayTimeLevel(5);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}

任意延迟时间的消息

int totalMessagesToSend = 1;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message(TOPIC_NAME, ("Hello timer message " + i).getBytes());
// 设置发送消息的时间
long timeStamp = System.currentTimeMillis() + 30000;
// 若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2022-08-08 08:08:08投递。
// 若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
//将 __STARTDELIVERTIME 设定到 msg 的属性中
message.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}

步骤3:消费消息

创建消费者

TDMQ RocketMQ 版支持 push 和 pull 两种消费模式。推荐Push消费模式
// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
// 设置NameServer的地址
pushConsumer.setNamesrvAddr(nameserver);
参数
说明
groupName
生产者组名称,在控制台集群管理中Group 页签中复制。
nameserver
集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img



订阅消息

根据消费模式不同,订阅方式也有所区别。
// 订阅topic
pushConsumer.subscribe(topic_name, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 消息处理逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费, 根据消费情况,返回处理状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();
参数
说明
topic_name
在控制台集群管理中 Topic 页签中复制具体 Topic 名称。
"*"
订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。

步骤4:查看消费详情

登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看详情,可查看消费者详情。
img


说明
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档