文档中心>消息队列 CKafka 版>SDK 文档>C# SDK>公网 SASL_PLAINTEXT 方式接入

公网 SASL_PLAINTEXT 方式接入

最近更新时间:2025-10-17 14:51:52

我的收藏

操作背景

本文以 C# 客户端为例介绍在公网环境下,使用 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息的过程。

前提条件

安装 .NET 环境。以 CentOS 为例,使用如下命令安装 .NET 环境。
sudo yum install -y dotnet-sdk-6.0

操作步骤

步骤1:控制台配置

1. 创建接入点。
1.1 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 基本信息 > 接入方式 中,单击添加路由策略,在弹窗中选择:路由类型:公网域名接入, 接入方式:SASL_PLAINTEXT

2. 创建角色。
ACL 策略管理下的用户管理页面新建角色,设置密码。

3. 创建 Topic。
在控制台 Topic 列表页面新建 Topic(参见 创建 Topic)。
4. 配置 ACL 策略。
参考配置 Topic 读写权限为创建好的角色配置 Topic 的读写权限。

步骤2:准备配置

1. 登录 Linux 服务器
1.1 创建项目目录,生产者创建如下目录
mkdir KafkaProducer
cd KafkaProducer
dotnet new console -n KafkaProducer
cd KafkaProducer
1.2 创建项目目录,消费者创建如下目录
mkdir KafkaConsumer
cd KafkaConsumer
dotnet new console -n KafkaConsumer
cd KafkaConsumer
1.3 安装 Confluent.Kafka NuGet 包
dotnet add package Confluent.Kafka
参数
说明
BootstrapServers
接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。

步骤3:发送消息

1. 编写生产消息文件 KafkaProducer.cs
using Confluent.Kafka;
using System;
using System.Threading.Tasks;

class KafkaProducer
{
static async Task Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "your_bootstrap_servers", // 替换为实际的服务器地址
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslUsername = "your_api_key", // 替换为实际的 API Key
SaslPassword = "your_secret", // 替换为实际的 Secret
Acks = Acks.Leader, // acks 配置:可选值如下
// Acks.All (等同于 acks=all):所有 ISR 副本确认(最强可靠性)
// Acks.Leader (等同于 acks=1):仅 leader 确认(默认)
// Acks.None (等同于 acks=0):不等待任何确认(最高吞吐,最低可靠性)
EnableIdempotence = false, // 关闭幂等性
MaxInFlight = 5 // 关闭幂等性
};

using var producer = new ProducerBuilder<Null, string>(config).Build();

for (int i = 1; i <= 10; i++)
{
var msg = $"this is ckafka msg value";
try
{
var result = await producer.ProduceAsync("test-topic",
new Message<Null, string> { Value = msg });

Console.WriteLine($"Sent: {msg} | Offset: {result.Offset}");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Failed: {e.Error.Reason}");
}
}

producer.Flush(TimeSpan.FromSeconds(10));
}
}

2. 编译并运行KafkaProducer.cs 发送消息。
dotnet run
说明
username 是实例 ID + # + 配置的用户名,password 是配置的用户密码。
参数
说明
BootstrapServers
接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。
3. 查看结果
Sent: this is ckafka msg value | Offset: 0
Sent: this is ckafka msg value | Offset: 1
Sent: this is ckafka msg value | Offset: 2
Sent: this is ckafka msg value | Offset: 3
Sent: this is ckafka msg value | Offset: 4
Sent: this is ckafka msg value | Offset: 5
Sent: this is ckafka msg value | Offset: 6
Sent: this is ckafka msg value | Offset: 7
Sent: this is ckafka msg value | Offset: 8
Sent: this is ckafka msg value | Offset: 9

步骤4:消费消息

1. 编写生产消息文件 KafkaConsumer.cs
using Confluent.Kafka;
using System;
using System.Threading;


class KafkaConsumer
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
// Kafka 公网接入点
BootstrapServers = "your_bootstrap_servers", // 替换为实际地址,如:kafka.example.com:9092


// 消费者组(建议不同消费者使用不同 GroupId)
GroupId = "consumer-group",
EnableAutoCommit = true, // 开启自动提交
AutoCommitIntervalMs = 5000, // 设置自动提交间隔:5000ms = 5秒
AutoOffsetReset = AutoOffsetReset.Earliest, // 如果没有初始化位点,从最早的消息开始消费
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "your_api_key", // 替换为你的 API Key
SaslPassword = "your_secret" // 替换为你的 Secret
};


using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("test-topic");


Console.WriteLine("Consumer started...");


var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cts.Cancel();
};


try
{
while (!cts.IsCancellationRequested)
{
try
{
// 同步消费消息(最长等待 100ms)
var consumeResult = consumer.Consume(100);


if (consumeResult != null)
{
Console.WriteLine($"Received: {consumeResult.Message.Value} " +
$"| Topic: {consumeResult.Topic} " +
$"| Partition: {consumeResult.Partition} " +
$"| Offset: {consumeResult.Offset} " +
$"| Timestamp: {consumeResult.Message.Timestamp.UtcDateTime:HH:mm:ss}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Shutdown signal received. Closing consumer...");
}
finally
{
// 关闭消费者,确保最后的提交完成
consumer.Close();
Console.WriteLine("Consumer closed.");
}
}
}
2. 编译并运行KafkaConsumer.cs
dotnet run
3. 运行结果。

Consumer started...
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 0 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 1 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 2 | Timestamp: 12:40:35
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 3 | Timestamp: 12:40:35
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 4 | Timestamp: 12:40:36
Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 5 | Timestamp: 12:40:36
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 0 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 1 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 2 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 3 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 4 | Timestamp: 12:40:35
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 5 | Timestamp: 12:40:36
Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 6 | Timestamp: 12:40:36
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 0 | Timestamp: 12:39:54
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 1 | Timestamp: 12:39:54
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 2 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 3 | Timestamp: 12:39:55
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 4 | Timestamp: 12:40:36
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 5 | Timestamp: 12:40:36
Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 6 | Timestamp: 12:40:36

4. 在 CKafka 控制台 Consumer Group 页面,查看消费详情。