公网 SASL_SSL方式接入

最近更新时间:2025-10-17 14:51:52

我的收藏

操作背景

本文以 C# 客户端为例,介绍在公网环境下,使用 SASL_SSL方式接入消息队列 CKafka 版并收发消息的过程。

前提条件

安装 .NET 环境。以 CentOS 为例,使用如下命令安装 .NET 环境。
sudo yum install -y dotnet-sdk-6.0

操作步骤

步骤1:控制台配置

1. 创建接入点。
1.1 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 基本信息 > 接入方式 中,单击添加路由策略,在弹窗中选择:路由类型:公网域名接入, 接入方式:SASL_SSL。

2. 创建角色。
ACL 策略管理下的用户管理页面新建角色,设置密码。

3. 创建 Topic。
在控制台 Topic 列表页面新建 Topic(参见 创建 Topic)。
4. 配置 ACL 策略。
参考配置 Topic 读写权限为创建好的角色配置 Topic 的读写权限。

步骤2:准备配置

1. 登录 Linux 服务器
1.1 创建项目目录,生产者创建如下目录
mkdir KafkaProducer
cd KafkaProducer
dotnet new console -n KafkaProducer
cd KafkaProducer
1.2 创建项目目录,消费者创建如下目录
mkdir KafkaConsumer
cd KafkaConsumer
dotnet new console -n KafkaConsumer
cd KafkaConsumer
1.3 安装 Confluent.Kafka NuGet 包
dotnet add package Confluent.Kafka
参数
说明
BootstrapServers
接入网络,在控制台的实例基本信息页面内接入方式模块网络列复制。

步骤3:发送消息

1. 编写生产消息文件 KafkaProducer.cs
using Confluent.Kafka;
using System;
using System.Threading.Tasks;

class Program
{
static async Task Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "your_bootstrap_servers", // 替换为实际的服务器地址
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = "your_api_key", // 替换为实际的 API Key
SaslPassword = "your_secret", // 替换为实际的 Secret
SslCaLocation = "/path/to/ca-cert.pem",// 证书地址
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
Acks = Acks.Leader,
EnableIdempotence = false,
MaxInFlight = 5
};

using var producer = new ProducerBuilder<Null, string>(config).Build();

for (int i = 1; i <= 10; i++)
{
var msg = $"this is ckafka msg value";
try
{
var result = await producer.ProduceAsync("test-topic",
new Message<Null, string> { Value = msg });

Console.WriteLine($"Sent: {msg} | Offset: {result.Offset}");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Failed: {e.Error.Reason}");
}
}

producer.Flush(TimeSpan.FromSeconds(10));
}
}
2. 编译并运行KafkaProducer.cs 发送消息。
dotnet run
说明
username 是实例 ID + # + 配置的用户名,password 是配置的用户密码。
参数
说明
BootstrapServers
接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。
3. 查看结果
Sent: this is ckafka msg value | Offset: 0
Sent: this is ckafka msg value | Offset: 1
Sent: this is ckafka msg value | Offset: 2
Sent: this is ckafka msg value | Offset: 3
Sent: this is ckafka msg value | Offset: 4
Sent: this is ckafka msg value | Offset: 5
Sent: this is ckafka msg value | Offset: 6
Sent: this is ckafka msg value | Offset: 7
Sent: this is ckafka msg value | Offset: 8
Sent: this is ckafka msg value | Offset: 9

步骤4:消费消息

1. 新建并编写消费消息文件 KafkaConsumer.cs
using Confluent.Kafka;
using System;
using System.Threading;

class KafkaConsumer
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "your_bootstrap_servers", // 替换为实际的服务器地址
GroupId = "consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "your_api_key", // 替换为实际的 API Key
SaslPassword = "your_secret", // 替换为实际的 Secret
SslCaLocation = "/path/to/ca-cert.pem", //证书地址
SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,
EnableAutoCommit = true,
AutoCommitIntervalMs = 5000 // 每 5 秒自动提交一次位点
};

using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("test-topic");

Console.WriteLine("Consumer started...");

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};

try
{
while (!cts.IsCancellationRequested)
{
try
{
// 同步消费消息(最长等待 100ms)
var consumeResult = consumer.Consume(cts.Token);

if (consumeResult != null)
{
Console.WriteLine($"Received: {consumeResult.Message.Value} " +
$"| Topic: {consumeResult.Topic} " +
$"| Partition: {consumeResult.Partition} " +
$"| Offset: {consumeResult.Offset} " +
$"| Timestamp: {consumeResult.Message.Timestamp.UtcDateTime:HH:mm:ss}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occurred: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Shutdown signal received. Closing consumer...");
}
finally
{
// 关闭消费者,确保最后的提交完成
consumer.Close();
Console.WriteLine("Consumer closed.");
}
}
}
2. 编译并运行KafkaConsumer.cs
dotnet run
3. 运行结果。
Consumer started...
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 6 | Timestamp: 04:19:03
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 7 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 8 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 9 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 10 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 11 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 12 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 13 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 7 | Timestamp: 04:19:03
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 8 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 9 | Timestamp: 04:19:10
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 10 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 11 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 7 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 8 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 9 | Timestamp: 04:19:04
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 10 | Timestamp: 04:19:10
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 11 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 12 | Timestamp: 04:19:11
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 13 | Timestamp: 04:19:11
4. 在 CKafka 控制台 Consumer Group 页面,查看消费详情。