操作背景
本文以 C# 客户端为例,介绍在公网环境下,使用 SASL_SSL方式接入消息队列 CKafka 版并收发消息的过程。
前提条件
安装 .NET 环境。以 CentOS 为例,使用如下命令安装 .NET 环境。
sudo yum install -y dotnet-sdk-6.0
操作步骤
步骤1:控制台配置
1. 创建接入点。
1.1 在 实例列表 页面,单击目标实例 ID,进入实例详情页。
1.2 在 基本信息 > 接入方式 中,单击添加路由策略,在弹窗中选择:
路由类型:公网域名接入, 接入方式:SASL_SSL。
2. 创建角色。
在 ACL 策略管理下的用户管理页面新建角色,设置密码。

3. 创建 Topic。
4. 配置 ACL 策略。
步骤2:准备配置
1. 登录 Linux 服务器
1.1 创建项目目录,生产者创建如下目录
mkdir KafkaProducercd KafkaProducerdotnet new console -n KafkaProducercd KafkaProducer
1.2 创建项目目录,消费者创建如下目录
mkdir KafkaConsumercd KafkaConsumerdotnet new console -n KafkaConsumercd KafkaConsumer
1.3 安装 Confluent.Kafka NuGet 包
dotnet add package Confluent.Kafka
参数 | 说明 |
BootstrapServers | 接入网络,在控制台的实例基本信息页面内接入方式模块网络列复制。 |
步骤3:发送消息
1. 编写生产消息文件 KafkaProducer.cs
using Confluent.Kafka;using System;using System.Threading.Tasks;class Program{static async Task Main(string[] args){var config = new ProducerConfig{BootstrapServers = "your_bootstrap_servers", // 替换为实际的服务器地址SaslMechanism = SaslMechanism.Plain,SecurityProtocol = SecurityProtocol.SaslSsl,SaslUsername = "your_api_key", // 替换为实际的 API KeySaslPassword = "your_secret", // 替换为实际的 SecretSslCaLocation = "/path/to/ca-cert.pem",// 证书地址SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,Acks = Acks.Leader,EnableIdempotence = false,MaxInFlight = 5};using var producer = new ProducerBuilder<Null, string>(config).Build();for (int i = 1; i <= 10; i++){var msg = $"this is ckafka msg value";try{var result = await producer.ProduceAsync("test-topic",new Message<Null, string> { Value = msg });Console.WriteLine($"Sent: {msg} | Offset: {result.Offset}");}catch (ProduceException<Null, string> e){Console.WriteLine($"Failed: {e.Error.Reason}");}}producer.Flush(TimeSpan.FromSeconds(10));}}
2. 编译并运行KafkaProducer.cs 发送消息。
dotnet run
说明
username 是
实例 ID + # + 配置的用户名,password 是配置的用户密码。参数 | 说明 |
BootstrapServers | 接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。 |
3. 查看结果
Sent: this is ckafka msg value | Offset: 0Sent: this is ckafka msg value | Offset: 1Sent: this is ckafka msg value | Offset: 2Sent: this is ckafka msg value | Offset: 3Sent: this is ckafka msg value | Offset: 4Sent: this is ckafka msg value | Offset: 5Sent: this is ckafka msg value | Offset: 6Sent: this is ckafka msg value | Offset: 7Sent: this is ckafka msg value | Offset: 8Sent: this is ckafka msg value | Offset: 9
步骤4:消费消息
1. 新建并编写消费消息文件 KafkaConsumer.cs
using Confluent.Kafka;using System;using System.Threading;class KafkaConsumer{static void Main(string[] args){var config = new ConsumerConfig{BootstrapServers = "your_bootstrap_servers", // 替换为实际的服务器地址GroupId = "consumer-group",AutoOffsetReset = AutoOffsetReset.Earliest,SecurityProtocol = SecurityProtocol.SaslSsl,SaslMechanism = SaslMechanism.Plain,SaslUsername = "your_api_key", // 替换为实际的 API KeySaslPassword = "your_secret", // 替换为实际的 SecretSslCaLocation = "/path/to/ca-cert.pem", //证书地址SslEndpointIdentificationAlgorithm = SslEndpointIdentificationAlgorithm.None,EnableAutoCommit = true,AutoCommitIntervalMs = 5000 // 每 5 秒自动提交一次位点};using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();consumer.Subscribe("test-topic");Console.WriteLine("Consumer started...");var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {e.Cancel = true;cts.Cancel();};try{while (!cts.IsCancellationRequested){try{// 同步消费消息(最长等待 100ms)var consumeResult = consumer.Consume(cts.Token);if (consumeResult != null){Console.WriteLine($"Received: {consumeResult.Message.Value} " +$"| Topic: {consumeResult.Topic} " +$"| Partition: {consumeResult.Partition} " +$"| Offset: {consumeResult.Offset} " +$"| Timestamp: {consumeResult.Message.Timestamp.UtcDateTime:HH:mm:ss}");}}catch (ConsumeException e){Console.WriteLine($"Error occurred: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Shutdown signal received. Closing consumer...");}finally{// 关闭消费者,确保最后的提交完成consumer.Close();Console.WriteLine("Consumer closed.");}}}
2. 编译并运行KafkaConsumer.cs
dotnet run
3. 运行结果。
Consumer started...Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 6 | Timestamp: 04:19:03Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 7 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 8 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 9 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 10 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 11 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 12 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [1] | Offset: 13 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 7 | Timestamp: 04:19:03Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 8 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 9 | Timestamp: 04:19:10Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 10 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [0] | Offset: 11 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 7 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 8 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 9 | Timestamp: 04:19:04Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 10 | Timestamp: 04:19:10Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 11 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 12 | Timestamp: 04:19:11Received: this is ckafka msg value | Topic: test-topic | Partition: [2] | Offset: 13 | Timestamp: 04:19:11
4. 在 CKafka 控制台 Consumer Group 页面,查看消费详情。
