C# SDK 使用

最近更新时间:2025-11-20 18:07:23

我的收藏

操作场景

本文以调用 C# SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数
安装 DotNet 环境

操作步骤

步骤1:安装 RocketMQ5 sdk 依赖库

在 C# 项目中引入相关依赖,使用如下命令:
dotnet add package RocketMQ.Client --version 5.2.0-rc1

步骤2:生产消息

using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq;
namespace examples
{
internal static class ProducerNormalMessageDemo
{
static readonly ILoggerFactory factory = LoggerFactory.Create(builder => builder.AddConsole());
static ILogger logger = factory.CreateLogger("Program_Producer");
internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "腾讯云官网接入地址:8080";
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetCredentialsProvider(credentialsProvider)
.Build();
const string topic = "topicName";
// In most case, you don't need to create too many producers, single pattern is recommended.
// Producer here will be closed automatically.
var producer = await new Producer.Builder()
// Set the topic name(s), which is optional but recommended.
// It makes producer could prefetch the topic route before message publishing.
.SetTopics(topic)
.SetClientConfig(clientConfig)
.Build();
// Define your message body.
var bytes = Encoding.UTF8.GetBytes("foobar");
const string tag = "yourMessageTagA";
var message = new Message.Builder()
.SetTopic(topic)
.SetBody(bytes)
.SetTag(tag)
// You could set multiple keys for the single message actually.
.SetKeys("yourMessageKey-7044358f98fc")
.Build();
var sendReceipt = await producer.Send(message);
logger.LogInformation($"Send message successfully, messageId={sendReceipt.MessageId}");
// Close the producer if you don't need it anymore.
await producer.DisposeAsync();
}
}
}
参数
说明
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
endpoints
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

topic
Topic 的名称,在控制台 Topic 管理页面复制。


步骤3:消费消息

腾讯云消息队列 TDMQ RocketMQ 版 5.x 系列支持两种消费模式,分别为 Push Consumer 和 Simple Consumer。
说明:
社区版 C# SDK 在 5.2.0-rc1 之后支持 Push Consumer
以下代码示例以 Simple Consumer 为例(在消息量特别少的情况下,使用单线程的 Simple Consumer 消费会有一点延迟,如果业务评估确实消息量少,建议开启多线程拉取,或者使用 Push Consumer):
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq;
namespace examples
{
internal static class SimpleConsumerExample
{
static readonly ILoggerFactory factory = LoggerFactory.Create(builder => builder.AddConsole());
static ILogger logger = factory.CreateLogger("Program_Consumer");
internal static async Task QuickStart()
{
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";
// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "rmq-xxx.rocketmq.gz.qcloud.tencenttdmq.com:8080"; // 腾讯云接入点
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(endpoints)
.SetCredentialsProvider(credentialsProvider)
.Build();
// Add your subscriptions.
const string consumerGroup = "yourConsumerGroup";
const string topic = "yourTopic";
var subscription = new Dictionary<string, FilterExpression>
{ { topic, new FilterExpression("*") } };
// In most case, you don't need to create too many consumers, single pattern is recommended.
var simpleConsumer = await new SimpleConsumer.Builder()
.SetClientConfig(clientConfig)
.SetConsumerGroup(consumerGroup)
.SetAwaitDuration(TimeSpan.FromSeconds(15))
.SetSubscriptionExpression(subscription)
.Build();
while (true)
{
var messageViews = await simpleConsumer.Receive(16, TimeSpan.FromSeconds(15));
foreach (var message in messageViews)
{
logger.LogInformation(
$"Received a message, topic={message.Topic}, message-id={message.MessageId}, body-size={message.Body.Length}");
await simpleConsumer.Ack(message);
logger.LogInformation($"Message is acknowledged successfully, message-id={message.MessageId}");
// await simpleConsumer.ChangeInvisibleDuration(message, TimeSpan.FromSeconds(15));
// Logger.LogInformation($"Changing message invisible duration successfully, message=id={message.MessageId}");
}
}
// Close the simple consumer if you don't need it anymore.
// await simpleConsumer.DisposeAsync();
}
}
}
Push Consumer
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Org.Apache.Rocketmq;

namespace examples
{
public class PushConsumerExample
{
private static readonly ILogger Logger = MqLogManager.CreateLogger(typeof(PushConsumerExample).FullName);

private static readonly string Endpoint = Environment.GetEnvironmentVariable("ROCKETMQ_ENDPOINT");

internal static async Task QuickStart()
{
const string accessKey = "yourAccessKey";
const string secretKey = "yourSecretKey";
// Enable the switch if you use .NET Core 3.1 and want to disable TLS/SSL.
// AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);

// Credential provider is optional for client configuration.
var credentialsProvider = new StaticSessionCredentialsProvider(accessKey, secretKey);
const string endpoints = "rmq-xxx.rocketmq.gz.qcloud.tencenttdmq.com:8080"; // 腾讯云接入点
var clientConfig = new ClientConfig.Builder()
.SetEndpoints(Endpoint)
.SetCredentialsProvider(credentialsProvider)
.Build();

// Add your subscriptions.
const string consumerGroup = "yourConsumerGroup";
const string topic = "yourTopic";
var subscription = new Dictionary<string, FilterExpression>
{ { topic, new FilterExpression("*") } };

var pushConsumer = await new PushConsumer.Builder()
.SetClientConfig(clientConfig)
.SetConsumerGroup(consumerGroup)
.SetSubscriptionExpression(subscription)
.SetMessageListener(new CustomMessageListener())
.Build();

Thread.Sleep(Timeout.Infinite);

// Close the push consumer if you don't need it anymore.
// await pushConsumer.DisposeAsync();
}

private class CustomMessageListener : IMessageListener
{
public ConsumeResult Consume(MessageView messageView)
{
// Handle the received message and return consume result.
Logger.LogInformation($"Consume message={messageView}");
return ConsumeResult.SUCCESS;
}
}
}
}
参数
说明
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
endpoints
集群接入地址,在控制台集群基本信息页面的接入信息模块获取。

consumerGroup
消费者组名称,在控制台 Group 管理页面复制。

topic
Topic 的名称,在控制台 Topic 管理页面复制。


步骤4:查看消息详情

发送完成消息后会得到一个消息 ID (messageID),您可以在控制台的消息查询 > 综合查询页面查询刚刚发送的消息,以及该消息的详情和轨迹等信息。