操作背景
本文以 C# 客户端为例介绍在公网环境下,使用 SASL_PLAINTEXT 方式接入消息队列 CKafka 版并收发消息的过程。
前提条件
安装 .NET 环境。以 CentOS 为例,使用如下命令安装 .NET 环境。
sudo yum install -y dotnet-sdk-6.0
操作步骤
步骤1:控制台配置
1. 创建接入点。
1.1 在 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 在 基本信息 > 接入方式 中,单击添加路由策略,在弹窗中选择:
路由类型:公网域名接入, 接入方式:SASL_PLAINTEXT。
2. 创建角色。
在 ACL 策略管理下的用户管理页面新建角色,设置密码。

3. 创建 Topic。
4. 配置 ACL 策略。
步骤2:准备配置
1. 登录 Linux 服务器
1.1 创建项目目录,生产者创建如下目录
mkdir KafkaProducercd KafkaProducerdotnet new console -n KafkaProducercd KafkaProducer
1.2 创建项目目录,消费者创建如下目录
mkdir KafkaConsumercd KafkaConsumerdotnet new console -n KafkaConsumercd KafkaConsumer
1.3 安装 Confluent.Kafka NuGet 包
dotnet add package Confluent.Kafka
参数 | 说明 |
BootstrapServers | 接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。 |
步骤3:发送消息
1. 编写生产消息文件 KafkaProducer.cs
using Confluent.Kafka;using System;using System.Threading.Tasks;class KafkaProducer{static async Task Main(string[] args){var config = new ProducerConfig{BootstrapServers = "your_bootstrap_servers", // 替换为实际的服务器地址SaslMechanism = SaslMechanism.Plain,SecurityProtocol = SecurityProtocol.SaslPlaintext,SaslUsername = "your_api_key", // 替换为实际的 API KeySaslPassword = "your_secret", // 替换为实际的 SecretAcks = Acks.Leader, // acks 配置:可选值如下// Acks.All (等同于 acks=all):所有 ISR 副本确认(最强可靠性)// Acks.Leader (等同于 acks=1):仅 leader 确认(默认)// Acks.None (等同于 acks=0):不等待任何确认(最高吞吐,最低可靠性)EnableIdempotence = false, // 关闭幂等性MaxInFlight = 5 // 关闭幂等性};using var producer = new ProducerBuilder<Null, string>(config).Build();for (int i = 1; i <= 10; i++){var msg = $"this is ckafka msg value";try{var result = await producer.ProduceAsync("test-topic",new Message<Null, string> { Value = msg });Console.WriteLine($"Sent: {msg} | Offset: {result.Offset}");}catch (ProduceException<Null, string> e){Console.WriteLine($"Failed: {e.Error.Reason}");}}producer.Flush(TimeSpan.FromSeconds(10));}}
2. 编译并运行KafkaProducer.cs 发送消息。
dotnet run
说明
username 是
实例 ID + # + 配置的用户名,password 是配置的用户密码。参数 | 说明 |
BootstrapServers | 接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。 |
3. 查看结果
Sent: this is ckafka msg value | Offset: 0Sent: this is ckafka msg value | Offset: 1Sent: this is ckafka msg value | Offset: 2Sent: this is ckafka msg value | Offset: 3Sent: this is ckafka msg value | Offset: 4Sent: this is ckafka msg value | Offset: 5Sent: this is ckafka msg value | Offset: 6Sent: this is ckafka msg value | Offset: 7Sent: this is ckafka msg value | Offset: 8Sent: this is ckafka msg value | Offset: 9
步骤4:消费消息
1. 编写生产消息文件 KafkaConsumer.cs
using Confluent.Kafka;using System;using System.Threading;class KafkaConsumer{static void Main(string[] args){var config = new ConsumerConfig{// Kafka 公网接入点BootstrapServers = "your_bootstrap_servers", // 替换为实际地址,如:kafka.example.com:9092// 消费者组(建议不同消费者使用不同 GroupId)GroupId = "consumer-group",EnableAutoCommit = true, // 开启自动提交AutoCommitIntervalMs = 5000, // 设置自动提交间隔:5000ms = 5秒AutoOffsetReset = AutoOffsetReset.Earliest, // 如果没有初始化位点,从最早的消息开始消费SecurityProtocol = SecurityProtocol.SaslPlaintext,SaslMechanism = SaslMechanism.Plain,SaslUsername = "your_api_key", // 替换为你的 API KeySaslPassword = "your_secret" // 替换为你的 Secret};using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();consumer.Subscribe("test-topic");Console.WriteLine("Consumer started...");var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) =>{e.Cancel = true;cts.Cancel();};try{while (!cts.IsCancellationRequested){try{// 同步消费消息(最长等待 100ms)var consumeResult = consumer.Consume(100);if (consumeResult != null){Console.WriteLine($"Received: {consumeResult.Message.Value} " +$"| Topic: {consumeResult.Topic} " +$"| Partition: {consumeResult.Partition} " +$"| Offset: {consumeResult.Offset} " +$"| Timestamp: {consumeResult.Message.Timestamp.UtcDateTime:HH:mm:ss}");}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Shutdown signal received. Closing consumer...");}finally{// 关闭消费者,确保最后的提交完成consumer.Close();Console.WriteLine("Consumer closed.");}}}
2. 编译并运行KafkaConsumer.cs
dotnet run
3. 运行结果。
Consumer started...Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 0 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 1 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 2 | Timestamp: 12:40:35Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 3 | Timestamp: 12:40:35Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 4 | Timestamp: 12:40:36Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 5 | Timestamp: 12:40:36Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 0 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 1 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 2 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 3 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 4 | Timestamp: 12:40:35Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 5 | Timestamp: 12:40:36Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 6 | Timestamp: 12:40:36Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 0 | Timestamp: 12:39:54Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 1 | Timestamp: 12:39:54Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 2 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 3 | Timestamp: 12:39:55Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 4 | Timestamp: 12:40:36Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 5 | Timestamp: 12:40:36Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 6 | Timestamp: 12:40:36
4. 在 CKafka 控制台 Consumer Group 页面,查看消费详情。
