C++ SDK

最近更新时间:2025-06-10 16:17:03

我的收藏

操作场景

本文以调用 C++ SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

1. 准备环境。
1.1 需要在客户端环境安装 RocketMQ-Client-CPP 库,根据官方文档进行安装即可 安装 CPP 动态库推荐使用 master 分支构建
1.2 在项目中引入 RocketMQ-Client-CPP 相关头文件及动态库。
2. 初始化消息生产者。
// 设置生产组名称
DefaultMQProducer producer(groupName);
// 设置服务接入地址
producer.setNamesrvAddr(nameserver);
// 设置用户权限
producer.setSessionCredentials(
accessKey, // 角色密钥
secretKey, // 角色名称
"");
// 设置命名空间(命名空间全称)
producer.setNameSpace(namespace);
// 请确保参数设置完成在启动之前
producer.start();
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
groupName
生产者组名称,在控制台 Group 管理页面复制。

nameserver
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

nameSpace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。

3. 发送消息。
// 初始化消息内容
MQMessage msg(
topicName, // topic名称
TAGS, // 消息tag
KEYS, // 消息业务key
"Hello cpp client, this is a message." // 消息内容
);
try {
// 发送消息
SendResult sendResult = producer.send(msg);
std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId()
<< std::endl;
} catch (MQException e) {
std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl;
}
参数
说明
topicName
Topic 的名称,在控制台 topic 页面复制。
TAGS
用来设置消息的 TAG。
KEYS
设置消息业务 key。
4. 资源释放。
// 释放资源
producer.shutdown();
5. 初始化消费者。
// 消息监听
class ExampleMessageListener : public MessageListenerConcurrently {
public:
ConsumeStatus consumeMessage(const std::vector<MQMessageExt> &msgs) {
for (auto item = msgs.begin(); item != msgs.end(); item++) {
// 业务
std::cout << "Received Message Topic:" << item->getTopic() << ", MsgId:" << item->getMsgId() << ", TAGS:"
<< item->getTags() << ", KEYS:" << item->getKeys() << ", Body:" << item->getBody() << std::endl;
}
// 消费成功返回CONSUME_SUCCESS
return CONSUME_SUCCESS;
// 消费失败返回RECONSUME_LATER,该消息将会被重新消费
// return RECONSUME_LATER;
}
};
// 初始化消费者
DefaultMQPushConsumer *consumer = new DefaultMQPushConsumer(groupName);
// 设置服务地址
consumer->setNamesrvAddr(nameserver);
// 设置用户权限
consumer->setSessionCredentials(
accessKey,
secretKey,
"");
// 设置命名空间
consumer->setNameSpace(namespace);
// 设置实例名称
consumer->setInstanceName("CppClient");
// 请注册自定义侦听函数用来处理接收到的消息,并返回响应的处理结果。
ExampleMessageListener *messageListener = new ExampleMessageListener();
// 订阅消息
consumer->subscribe(topicName, TAGS);
// 设置消息监听
consumer->registerMessageListener(messageListener);
// 准备工作完成,必须调用启动函数,才可以正常工作。
consumer->start();
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
groupName
消费组名称,在控制台 Group 管理页面复制。

nameserver
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

namespace
命名空间的名称,在控制台命名空间页面复制。

topicName
Topic 的名称,在控制台 topic 页面复制。
TAGS
用来设置订阅消息的 TAG。
6. 资源释放。
// 资源释放
consumer->shutdown();
7. 查看消费详情。发送完成消息后会得到一个消息 ID (messageID),您可以在控制台的消息查询 > 综合查询页面查询刚刚发送的消息,以及该消息的详情和轨迹等信息。

如果需要发送任意延迟消息,设置__STARTDELIVERTIME 属性即可。
MQMessage msg("控制台topic", "tag", "Delay Message.");
chrono::system_clock::duration d = chrono::system_clock::now().time_since_epoch();
chrono::milliseconds mil = chrono::duration_cast<chrono::milliseconds>(d);
//定时延时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递
//设置需要延时或者定时的时间,例如当前时间延迟30秒后投递。
long expectTime = mil.count() + 30000;
msg.setProperty("__STARTDELIVERTIME", to_string(expectTime));
说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档