有奖捉虫:办公协同&微信生态&物联网文档专题 HOT

操作场景

本文以调用 C++ SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

1. 准备环境。
1.1 需要在客户端环境安装 RocketMQ-Client-CPP 库,根据官方文档进行安装即可 安装 CPP 动态库推荐使用 master 分支构建
1.2 在项目中引入 RocketMQ-Client-CPP 相关头文件及动态库。
2. 初始化消息生产者。
// 设置生产组名称
DefaultMQProducer producer(groupName);
// 设置服务接入地址
producer.setNamesrvAddr(nameserver);
// 设置用户权限
producer.setSessionCredentials(
accessKey, // 角色密钥
secretKey, // 角色名称
"");
// 设置命名空间(命名空间全称)
producer.setNameSpace(namespace);
// 请确保参数设置完成在启动之前
producer.start();
参数
说明
groupName
生产者组名称,在控制台集群管理中 Group tab 中获取。
nameserver
集群接入地址,在控制台集群管理页面操作列的获取接入地址获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。

secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img
img

namespace
命名空间的名称,在控制台命名空间页面复制。
3. 发送消息。
// 初始化消息内容
MQMessage msg(
topicName, // topic名称
TAGS, // 消息tag
KEYS, // 消息业务key
"Hello cpp client, this is a message." // 消息内容
);
try {
// 发送消息
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
<< std::endl;
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
参数
说明
topicName
Topic 的名称,在控制台 topic 页面复制。
TAGS
用来设置消息的 TAG。
KEYS
设置消息业务 key。
4. 资源释放。
// 释放资源
producer.shutdown();
5. 初始化消费者。
// 消息监听
class ExampleMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
// 业务
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << ", TAGS:"
<< item->getTags() << ", KEYS:" << item->getKeys() << ", Body:" << item->getBody() << std::endl;
}
// 消费成功返回CONSUME_SUCCESS
return CONSUME_SUCCESS;
// 消费失败返回RECONSUME_LATER,该消息将会被重新消费
// return RECONSUME_LATER;
}
};
// 初始化消费者
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer(groupName);
// 设置服务地址
consumer->setNamesrvAddr(nameserver);
// 设置用户权限
consumer->setSessionCredentials(
accessKey,
secretKey,
"");
// 设置命名空间
consumer->setNameSpace(namespace);
// 设置实例名称
consumer->setInstanceName("CppClient");
// 请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
ExampleMessageListener *messageListener = new ExampleMessageListener();
// 订阅消息
consumer->subscribe(topicName, TAGS);
// 设置消息监听
consumer->registerMessageListener(messageListener);
// 准备工作完成,必须调用启动函数,才可以正常工作。
consumer->start();
参数
说明
groupName
消费者组名称。在控制台集群管理中 Group 页签中获取。
nameserver
集群接入地址,在集群管理页面操作列的获取接入地址获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。

secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img
img

namespace
命名空间的名称,在控制台命名空间页面复制。
topicName
Topic 的名称,在控制台 topic 页面复制。
TAGS
用来设置订阅消息的 TAG。
6. 资源释放。
// 资源释放
consumer->shutdown();
7. 查看消费详情。登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看详情,可查看消费者详情。

如果需要发送任意延迟消息,设置__STARTDELIVERTIME 属性即可。
MQMessage msg("控制台topic", "tag", "Delay Message.");
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
//定时延时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
//设置需要延时或者定时的时间,例如当前时间延迟30秒后投递。
long expectTime = mil.count() + 30000;
msg.setProperty("__STARTDELIVERTIME", to_string(expectTime));
说明
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档