前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Net使用EasyNetQ简化与RabbitMQ的交互

Net使用EasyNetQ简化与RabbitMQ的交互

原创
作者头像
Net分享
发布2024-12-05 14:58:15
发布2024-12-05 14:58:15
830
举报

# Net使用EasyNetQ简化与RabbitMQ的交互

EasyNetQ是一个为.NET环境设计的RabbitMQ客户端API,旨在简化与RabbitMQ的交互。

关于RabbitMq的更多知识点在:[https://www.dotnetshare.com](https://www.dotnetshare.com/)

![](https://i-blog.csdnimg.cn/direct/562ed37588f24cc3bf7a44d4a63da582.png)

![](https://i-blog.csdnimg.cn/direct/570c121b5ae24b75aacb79c521fc69bd.png)

公众号:Net分享,欢迎关注

### 安装EasyNetQ

你可以通过NuGet包管理器来安装EasyNetQ。在Package Manager Console中运行以下命令:

```shell

PM> Install-Package EasyNetQ

```

这将同时安装EasyNetQ和其依赖的RabbitMQ.Client库。

建议使用DI安装,EasyNetQ.DI.Microsof包含EasyNetQ,同时依赖Newtonsoft.Json

```shell

<PackageReference Include="EasyNetQ.DI.Microsoft" Version="7.8.0" />

<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />

```

### 注册连接RabbitMQ

```csharp

var connectionString = "host=111.111.11.111;virtualHost=/;username=admin;password=123456;timeout=60";

 //链接注册

builder.Services.RegisterEasyNetQ("host=8.153.70.182;virtualHost=/;username=zhaoke;password=123123;publisherConfirms=true");

//发布注册

builder.Services.AddTransient<MQPublish>();

//订阅注册

builder.Services.AddTransient<MQSubscribe>();

//添加消息处理

builder.Services.AddHostedService<SubscribeWorker>();

```

### 发布消息

EasyNetQ支持发布/订阅模式,你可以通过创建一个.NET类来定义消息,然后使用`Publish`方法发布消息。例如:

```csharp

public class TextMessage

{

public string Text { get; set; }

}

bus.Publish(new TextMessage { Text = "Hello World" });

```

EasyNetQ会根据消息类型自动创建交换机和队列,并使用Newtonsoft.Json序列化消息为JSON格式。

#### MQPublish的封装

```

using EasyNetQ.Topology;

using EasyNetQ;

/// <summary>

/// 发布消息

/// </summary>

public class MQPublish

{

    private readonly IBus bus;

    public MQPublish(IBus bus)

    {

        this.bus = bus;

    }

    /// <summary>

    /// 发布消息

    /// </summary>

    /// <param name="routingKey"></param>

    /// <param name="data"></param>

    public async Task PublishMessageAsync(string routingKey, object data)

    {

        Console.WriteLine($"MQ消息推送,routingKey :{routingKey} , 推送数据 :{System.Text.Json.JsonSerializer.Serialize(data)}");

        var message = new Message<object>(data);

        var advancedBus = bus.Advanced;

        advancedBus.QueueDeclare(routingKey);

        await advancedBus.PublishAsync(Exchange.Default, routingKey, false, message);

    }

    /// <summary>

    /// 发布延迟消息

    /// </summary>

    /// <param name="routingKey"></param>

    /// <param name="data"></param>

    /// <param name="timeout">毫秒</param>

    public void PublishDelayMessage(string routingKey, object data, int timeout)

    {

        var advancedBus = bus.Advanced;

        var message = new Message<object>(data);

        var properties = new MessageProperties();

        properties.Headers.Add("x-delay", timeout);

        var messageData = new Message<object>(message, properties);

        // 建立延时 exchange

        var exDelay = advancedBus.ExchangeDeclare($"{routingKey}_delay_exchange", cfg => cfg.AsDelayedExchange(ExchangeType.Direct));

        // 申明队列

        var qNormal = advancedBus.QueueDeclare($"{routingKey}_delay_queue");

        // 绑定,设置好 RoutingKey

        advancedBus.Bind(exDelay, qNormal, routingKey);

        bus.Advanced.Publish(exDelay, routingKey, false, messageData);

    }

}

```

### 订阅消息

订阅消息时,你需要指定一个订阅ID和一个处理消息的委托。例如:

```csharp

bus.Subscribe<TextMessage>("subscriptionId", message =>

{

Console.WriteLine("Received message: " + message.Text);

});

```

这样,当有消息发布到对应的交换机和队列时,你的订阅就会收到消息。

#### 封装MQSubscribe

```

public class MQSubscribe

{

    // MQ消息总线

    private readonly IBus bus;

    public MQSubscribe(IBus bus)

    {

        this.bus = bus;

    }

    /// <summary>

    /// 处理消息的总入口

    /// </summary>

    /// <returns></returns>

    public Task Init()

    {

        SubscribeTSysLogVis();

        //程序不结束,等待输入

        Console.WriteLine($"已启动(处理消息) {DateTime.UtcNow}");

        return Task.CompletedTask;

    }

    private Task SubscribeTSysLogVis()

    {

        var advancedBus = bus.Advanced;

        //订阅TSysLogVis日志 - 请不要在两次发布之间重复使用它

        var queue = advancedBus.QueueDeclare("TSysLogVis");

        advancedBus.Consume(queue, async (body, properties, info) =>

        {

            try

            {

                var message = Encoding.UTF8.GetString(body.ToArray());

                //var data = JsonConvert.DeserializeObject<TSysLogVis>(message);

                Console.WriteLine($"消息处理 {DateTime.Now} : {message}");

                //db.Insertable(data).SplitTable().ExecuteReturnSnowflakeId();

            }

            catch (Exception ex)

            {

                // 处理异常,例如记录日志或重新抛出

                Console.Error.WriteLine($"处理消息时发生异常: {ex}");

            }

        });

        return Task.CompletedTask;

    }

}

```

### SubscribeWorker

 启用订阅服务即可

```csharp

public class SubscribeWorker : BackgroundService

    {

private readonly MQSubscribe _Service;

public SubscribeWorker(MQSubscribe Service)

        {

_Service = Service;

        }

        // 执行逻辑

protected override async Task ExecuteAsync(CancellationToken stoppingToken)

        {

await _Service.Init();

        }

    }

```

以上是EasyNetQ的基本使用方法,它还支持请求/响应模式和发送/接收模式,以及其他高级功能。你可以通过阅读官方文档来获取更多信息。

#### 基础知识

- 消息队列(Message Queue):Rabbit 是基于消息队列的中间件,它允许应用程序通过发送和接收消息来进行异步通信。

- 生产者(Producer)和消费者(Consumer):Rabbit 中的应用程序可以充当生产者和消费者的角色。生产者负责将消息发送到队列中,消费者则从队列中接收并处理消息。

- 队列(Queue):Rabbit 使用队列来存储消息。生产者将消息发送到队列中,而消费者从队列中接收消息进行处理。队列可以确保消息的顺序性和可靠性。

- 交换器(Exchange):交换器负责接收生产者发送的消息,并根据一定的规则将消息路由到特定的队列中。Rabbit 提供了不同类型的交换器,如直连交换器(Direct Exchange)、主题交换器(Topic Exchange)等。

- 绑定(Binding):绑定将队列与交换器进行关联,定义了消息从交换器路由到队列的规则。一个队列可以绑定到多个交换器上,一个交换器也可以将消息路由到多个队列上。

- 路由键(Routing Key):生产者在发送消息时需要指定一个路由键,交换器根据路由键来判断将消息路由到哪些队列。不同类型的交换器对路由键的处理方式有所不同。

- 持久化(Durability):Rabbit 支持消息的持久化,即将消息存储到磁盘中以防止消息丢失。可以将队列、交换器和消息设置为持久化。

- 可靠性投递(Reliable Delivery):Rabbit 提供了消息可靠性投递的机制,确保消息能够被消费者正确地接收和处理。包括消息的确认机制、消息的重试、消息的死信队列等。

- 发布/订阅(Publish/Subscribe)模式:Rabbit 支持发布/订阅模式,其中一个生产者可以将消息同时发送给多个消费者,每个消费者都会收到相同的消息副本。

- ACK 机制:消费者收到消息后需要发送 ACK(确认)给 Rabbit 服务器,告知服务器消息已经被成功接收和处理,服务器可以将消息从队列中删除。

欢迎关注我的公众号“**Net分享**”,技术文章第一时间推送,随缘更新 , 分享一些你可能注意不到的细节。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档