公网 SASL_SSL方式接入

最近更新时间:2025-10-10 15:42:32

我的收藏

操作场景

该任务以 C++ 客户端为例,指导您使用公网 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息。

前提条件

操作步骤

步骤1:安装 C/C++ 依赖库

步骤2:安装 SSL/SASL 依赖

yum install openssl openssl-devel
yum install cyrus-sasl{,-plain}

步骤3:发送消息

1. 创建 producer.cpp 文件。

// producer.cpp
#include <iostream>
#include <string>
#include "librdkafka/rdkafkacpp.h"

int main() {
std::string brokers = "$broker"; // 修改为你的 Kafka 地址
std::string topic = "$topic"; // topic名字
std::string ca_file = "./CARoot.pem"; // CA 证书路径(与程序同目录)
std::string sasl_username = "$username"; // SASL 用户名
std::string sasl_password = "$password"; // SASL 密码

std::string errstr;
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::unique_ptr<RdKafka::Conf> tconf(RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC));

// ====== 全局配置 ======
conf->set("metadata.broker.list", brokers, errstr);
conf->set("security.protocol", "sasl_ssl", errstr);
conf->set("ssl.ca.location", ca_file, errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", sasl_username, errstr);
conf->set("sasl.password", sasl_password, errstr);

// acks=1
conf->set("acks", "1", errstr);

// 重试机制
conf->set("message.send.max.retries", "5", errstr);
conf->set("retry.backoff.ms", "1000", errstr);

// max.in.flight.requests.per.connection=5
conf->set("max.in.flight.requests.per.connection", "5", errstr);

// ====== 创建生产者 ======
std::unique_ptr<RdKafka::Producer> producer(RdKafka::Producer::create(conf.get(), errstr));
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return 1;
}

// ====== 创建主题对象 ======
std::unique_ptr<RdKafka::Topic> topic_obj(RdKafka::Topic::create(producer.get(), topic, tconf.get(), errstr));
if (!topic_obj) {
std::cerr << "Failed to create topic object: " << errstr << std::endl;
return 1;
}

// ====== 发送消息 ======
for (int i = 1; i <= 5; ++i) {
std::string msg = "Message from C++ on CentOS #" + std::to_string(i);
RdKafka::ErrorCode resp = producer->produce(
topic_obj.get(),
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char*>(msg.c_str()), msg.size(),
nullptr, nullptr);

if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
} else {
std::cout << "Sent: " << msg << std::endl;
}
}

// ====== 等待发送完成 ======
producer->flush(5000); // 最多等待 5 秒

return 0;
}
2. 执行以下命令编译 producer.cpp。

g++ -std=c++11 \\
-I/usr/local/include \\
-L/usr/local/lib \\
producer.cpp \\
-lrdkafka++ -lrdkafka \\
-o producer
3. 执行以下命令发送消息。

./producer
参数
描述
broker
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
img


topic
Topic 名称,您可以在控制台上topic管理页面复制。



username
username 是实例 ID + # + 配置的用户名,实例 ID 在 CKafka 控制台的实例详情页面的基本信息获取,用户名在控制台 ACL 策略管理下的用户管理页面创建用户时设置
password
password 是配置的用户密码,在控制台 ACL 策略管理下的用户管理页面创建用户时设置。
运行结果如下:



4. CKafka 控制台topic 管理页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。




步骤4:消费消息

1. 创建 consumer.cpp 文件。

// consumer.cpp
#include <iostream>
#include <string>
#include "librdkafka/rdkafkacpp.h"

class ConsumerCallback : public RdKafka::ConsumeCb {
public:
void consume_cb(RdKafka::Message &msg, void *opaque) override {
if (msg.err()) {
std::cerr << "Error: " << msg.errstr() << std::endl;
} else {
std::cout << "Received: " << std::string(static_cast<const char*>(msg.payload()), msg.len()) << std::endl;
}
}
};

int main() {
std::string brokers = "$broker";
std::string topic = "$topic";
std::string group_id = "$group.id";
std::string ca_file = "./CARoot.pem";
std::string sasl_username = "$username";
std::string sasl_password = "$password";

std::string errstr;
std::unique_ptr<RdKafka::Conf> conf(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));

conf->set("metadata.broker.list", brokers, errstr);
conf->set("security.protocol", "sasl_ssl", errstr);
conf->set("ssl.ca.location", ca_file, errstr);
conf->set("sasl.mechanisms", "PLAIN", errstr);
conf->set("sasl.username", sasl_username, errstr);
conf->set("sasl.password", sasl_password, errstr);
conf->set("group.id", group_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("enable.auto.commit", "true", errstr);
conf->set("auto.commit.interval.ms", "5000", errstr);
conf->set("max.in.flight.requests.per.connection", "5", errstr);

ConsumerCallback consume_callback;
conf->set("consume.callback", &consume_callback, errstr);

std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(conf.get(), errstr));
if (!consumer) {
std::cerr << "Failed to create consumer: " << errstr << std::endl;
return 1;
}

RdKafka::ErrorCode resp = consumer->subscribe({topic});
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Subscribe failed: " << RdKafka::err2str(resp) << std::endl;
return 1;
}

std::cout << "Consumer started, waiting for messages..." << std::endl;

while (true) {
consumer->consume(1000); // 每次最多等待 1 秒
}

return 0;
}
2. 执行以下命令编译 consumer.cpp。

g++ -std=c++11 -I/usr/local/include -L/usr/local/lib \\
consumer.cpp -lrdkafka++ -lrdkafka -o consumer
3. 执行以下命令发送消息。

./consumer
参数
描述
broker
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
img


group.id
消费分组名称,您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。
username
username 是实例 ID + # + 配置的用户名,实例 ID 在 CKafka 控制台的实例详情页面的基本信息获取,用户名在控制台 ACL 策略管理下的用户管理页面创建用户时设置
password
password 是配置的用户密码,在控制台 ACL 策略管理下的用户管理页面创建用户时设置。
topic1 topic2
Topic 名称,您可以在控制台上 topic 管理页面复制。



运行结果如下:



4. CKafka 控制台Consumer Group 页面,选择对应的消费者组名称,在主题名称输入 Topic ,单击查询详情查看消费详情。
5.