操作场景
该任务以 C++ 客户端为例,指导您使用公网 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息。
前提条件
安装 GCC
下载 Demo
操作步骤
步骤1:安装 C/C++ 依赖库
步骤2:安装 SSL/SASL 依赖
yum install openssl openssl-develyum install cyrus-sasl{,-plain}
步骤3:发送消息
1. 创建 producer.cpp 文件。
// producer.cpp#include <iostream>#include <string>#include "librdkafka/rdkafkacpp.h"int main() {std::string brokers = "$broker"; // 修改为你的 Kafka 地址std::string topic = "$topic"; // topic名字std::string ca_file = "./CARoot.pem"; // CA 证书路径(与程序同目录)std::string sasl_username = "$username"; // SASL 用户名std::string sasl_password = "$password"; // SASL 密码std::string errstr;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));std::unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));// ====== 全局配置 ======conf->set("metadata.broker.list", brokers, errstr);conf->set("security.protocol", "sasl_ssl", errstr);conf->set("ssl.ca.location", ca_file, errstr);conf->set("sasl.mechanisms", "PLAIN", errstr);conf->set("sasl.username", sasl_username, errstr);conf->set("sasl.password", sasl_password, errstr);// acks=1conf->set("acks", "1", errstr);// 重试机制conf->set("message.send.max.retries", "5", errstr);conf->set("retry.backoff.ms", "1000", errstr);// max.in.flight.requests.per.connection=5conf->set("max.in.flight.requests.per.connection", "5", errstr);// ====== 创建生产者 ======std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;return 1;}// ====== 创建主题对象 ======std::unique_ptr<RdKafka::Topic> topic_obj(RdKafka::Topic::create(producer.get(), topic, tconf.get(), errstr));if (!topic_obj) {std::cerr << "Failed to create topic object: " << errstr << std::endl;return 1;}// ====== 发送消息 ======for (int i = 1; i <= 5; ++i) {std::string msg = "Message from C++ on CentOS #" + std::to_string(i);RdKafka::ErrorCode resp = producer->produce(topic_obj.get(),RdKafka::Topic::PARTITION_UA,RdKafka::Producer::RK_MSG_COPY,const_cast<char*>(msg.c_str()), msg.size(),nullptr, nullptr);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;} else {std::cout << "Sent: " << msg << std::endl;}}// ====== 等待发送完成 ======producer->flush(5000); // 最多等待 5 秒return 0;}
2. 执行以下命令编译 producer.cpp。
g++ -std=c++11 \\-I/usr/local/include \\-L/usr/local/lib \\producer.cpp \\-lrdkafka++ -lrdkafka \\-o producer
3. 执行以下命令发送消息。
./producer
参数 | 描述 |
broker | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 ![]() |
topic | Topic 名称,您可以在控制台上topic管理页面复制。 ![]() |
username | username 是 实例 ID + # + 配置的用户名,实例 ID 在 CKafka 控制台的实例详情页面的基本信息获取,用户名在控制台 ACL 策略管理下的用户管理页面创建用户时设置 |
password | password 是配置的用户密码,在控制台 ACL 策略管理下的用户管理页面创建用户时设置。 |
运行结果如下:


4. 在 CKafka 控制台topic 管理页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。


步骤4:消费消息
1. 创建 consumer.cpp 文件。
// consumer.cpp#include <iostream>#include <string>#include "librdkafka/rdkafkacpp.h"class ConsumerCallback : public RdKafka::ConsumeCb {public:void consume_cb(RdKafka::Message &msg, void *opaque) override {if (msg.err()) {std::cerr << "Error: " << msg.errstr() << std::endl;} else {std::cout << "Received: " << std::string(static_cast<const char*>(msg.payload()), msg.len()) << std::endl;}}};int main() {std::string brokers = "$broker";std::string topic = "$topic";std::string group_id = "$group.id";std::string ca_file = "./CARoot.pem";std::string sasl_username = "$username";std::string sasl_password = "$password";std::string errstr;std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));conf->set("metadata.broker.list", brokers, errstr);conf->set("security.protocol", "sasl_ssl", errstr);conf->set("ssl.ca.location", ca_file, errstr);conf->set("sasl.mechanisms", "PLAIN", errstr);conf->set("sasl.username", sasl_username, errstr);conf->set("sasl.password", sasl_password, errstr);conf->set("group.id", group_id, errstr);conf->set("auto.offset.reset", "earliest", errstr);conf->set("enable.auto.commit", "true", errstr);conf->set("auto.commit.interval.ms", "5000", errstr);conf->set("max.in.flight.requests.per.connection", "5", errstr);ConsumerCallback consume_callback;conf->set("consume.callback", &consume_callback, errstr);std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(conf.get(), errstr));if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;return 1;}RdKafka::ErrorCode resp = consumer->subscribe({topic});if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Subscribe failed: " << RdKafka::err2str(resp) << std::endl;return 1;}std::cout << "Consumer started, waiting for messages..." << std::endl;while (true) {consumer->consume(1000); // 每次最多等待 1 秒}return 0;}
2. 执行以下命令编译 consumer.cpp。
g++ -std=c++11 -I/usr/local/include -L/usr/local/lib \\consumer.cpp -lrdkafka++ -lrdkafka -o consumer
3. 执行以下命令发送消息。
./consumer
参数 | 描述 |
broker | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 ![]() |
group.id | 消费分组名称,您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。 |
username | username 是 实例 ID + # + 配置的用户名,实例 ID 在 CKafka 控制台的实例详情页面的基本信息获取,用户名在控制台 ACL 策略管理下的用户管理页面创建用户时设置 |
password | password 是配置的用户密码,在控制台 ACL 策略管理下的用户管理页面创建用户时设置。 |
topic1 topic2 | Topic 名称,您可以在控制台上 topic 管理页面复制。 ![]() |
运行结果如下:


4. 在 CKafka 控制台的 Consumer Group 页面,选择对应的消费者组名称,在主题名称输入 Topic ,单击查询详情查看消费详情。
5. 




