VPC网络接入

最近更新时间:2025-10-17 14:51:52

我的收藏

操作场景

本文以在 VPC 网络环境下调用 C# SDK 为例介绍接入消息队列 CKafka 版并收发消息的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

安装.NET环境。以 CentOS 为例,使用如下命令安装 .NET 环境
sudo yum install -y dotnet-sdk-6.0

操作步骤

步骤1:准备配置

1. 登录 Linux 服务器
1.1 创建项目目录,生产者创建如下目录
mkdir KafkaProducer
cd KafkaProducer
dotnet new console -n KafkaProducer
cd KafkaProducer
1.2 创建项目目录,消费者创建如下目录
mkdir KafkaConsumer
cd KafkaConsumer
dotnet new console -n KafkaConsumer
cd KafkaConsumer
1.3 安装 Confluent.Kafka NuGet 包
dotnet add package Confluent.Kafka
参数
说明
BootstrapServers
接入网络,在控制台的实例基本信息页面内接入方式模块的网络列复制。

步骤2:发送消息

1. 新建并编写生产消息文件 KafkaProducer.cs。
using Confluent.Kafka;
using System;
using System.Threading.Tasks;

class KafkaProducer
{
static async Task Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",//接入点使用控制台提供的接入点
Acks = Acks.Leader, // acks 配置:可选值如下
// Acks.All (等同于 acks=all):所有 ISR 副本确认(最强可靠性)
// Acks.Leader (等同于 acks=1):仅 leader 确认(默认)
// Acks.None (等同于 acks=0):不等待任何确认(最高吞吐,最低可靠性)
EnableIdempotence = false, // 关闭幂等性
MaxInFlight = 5 // 单个连接最大发送请求数,建议5
};

using var producer = new ProducerBuilder<Null, string>(config).Build();

for (int i = 1; i <= 10; i++)
{
var msg = $"this is ckafka msg value";
try
{
var result = await producer.ProduceAsync("test-topic",
new Message<Null, string> { Value = msg });

Console.WriteLine($"Sent: {msg} | Offset: {result.Offset}");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Failed: {e.Error.Reason}");
}
}
}
}
2. 编译并运行KafkaProducer.cs 发送消息。
dotnet run
3. 运行结果。
Sent: this is ckafka msg value | Offset: 0
Sent: this is ckafka msg value | Offset: 1
Sent: this is ckafka msg value | Offset: 2
Sent: this is ckafka msg value | Offset: 3
Sent: this is ckafka msg value | Offset: 4
Sent: this is ckafka msg value | Offset: 5
Sent: this is ckafka msg value | Offset: 6
Sent: this is ckafka msg value | Offset: 7
Sent: this is ckafka msg value | Offset: 8
Sent: this is ckafka msg value | Offset: 9
4. CKafka 控制台Topic 列表页面,选择对应的 Topic ,单击更多 > 消息查询,查看刚刚发送的消息。


步骤3:消费消息

1. 创建 Consumer 订阅消息程序 KafkaConsumer.cs。
mkdir KafkaConsumer
cd KafkaConsumer
dotnet new console -n KafkaConsumer
cd KafkaConsumer
2. 编译并运行 KafkaConsumer.cs 消费消息。
using Confluent.Kafka;
using System;

class KafkaConsumer
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092", //接入点
GroupId = "consumer-group", //消费组名字
AutoOffsetReset = AutoOffsetReset.Earliest,// 如果没有初始化位点,从最早的消息开始消费
EnableAutoCommit = true, //开启自动提交
AutoCommitIntervalMs = 5000 //设置自动提交间隔:5000ms = 5秒
};

using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("test-topic");

Console.WriteLine("Consumer running...");

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};

try
{
while (true)
{
var result = consumer.Consume(cts.Token);
Console.WriteLine($"Received: {result.Message.Value} @ {result.TopicPartitionOffset}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer stopped.");
}
finally
{
consumer.Close();
}
}
}
3. 运行结果。
Consumer running...
Received: this is ckafka msg value @ test-topic [[0]] @0
Received: this is ckafka msg value @ test-topic [[0]] @1
Received: this is ckafka msg value @ test-topic [[0]] @2
Received: this is ckafka msg value @ test-topic [[0]] @3
Received: this is ckafka msg value @ test-topic [[0]] @4
Received: this is ckafka msg value @ test-topic [[0]] @5
Received: this is ckafka msg value @ test-topic [[0]] @6
Received: this is ckafka msg value @ test-topic [[0]] @7
Received: this is ckafka msg value @ test-topic [[0]] @8
Received: this is ckafka msg value @ test-topic [[0]] @9

4. CKafka 控制台Consumer Group 页面,查看消费详情。