前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >.NET Core 使用 Channel 消息队列

.NET Core 使用 Channel 消息队列

作者头像
郑子铭
发布2023-08-30 09:19:49
3420
发布2023-08-30 09:19:49
举报

背景

最近做一个项目,连接了很多设备,需要保存设备的心跳数据,刚开始的做法是直接接收到设备的数据之后进行心跳数据的保存,但是随着设备多了起来,然后设备的使用时长不断的加大,对数据库的压力也比较大,所以想着优化一下。

方案调研

1、使用第三方中间件

常见的使用redis,或者mq,只需要不断的向中间件发送数据即可,redis使用队列,如果是mq直接发送消息即可,使用起来简单方便,但是要引入这些中间件,目前的架构里面没有,需要自己去起服务,维护。

2、使用channel

System.Threading.Channels 是.NET Core 3.0 后推出的新的集合类型, 具有异步API,高性能,线程安全等特点,它可以用来做消息队列,进行数据的生产和消费, 公开的 Writer 和 Reader api对应消息的生产者和消费者,也让Channel更加的简洁和易用,与Rabbit MQ 等其他队列不同的是,Channel 是进程内的队列

目前就介绍来看非常完美,不需要添加第三方中间件,直接添加现有的模块即可。

代码实现

选择了使用channel来做优化。拿到设备数据之后直接把消息丢入到channel,然后后台使用定时任务或者自己实现hostservice去不断的消费数据。

生产者代码

代码语言:javascript
复制
public async Task ProduceHeartBeat(string message)
{
    await channel.Writer.WriteAsync(message);
}

不断的向里面写入数据即可.

消费者代码

代码语言:javascript
复制
/// <summary>
/// timespan时间内消费多少数据
/// </summary>
/// <param name="count"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
{
     var result = new List<string>(count);
     CancellationTokenSource cts = new CancellationTokenSource();
     var cancellationToken = cts.Token;
     cts.CancelAfter(timeSpan);
     int rcount = 0;
     while ( !cancellationToken.IsCancellationRequested && rcount<count)
     {
         //await Task.Delay(2000);
         if (channel.Reader.TryRead(out var number))
         {
             Console.WriteLine(number);
             result.Add(number);
             rcount++;
         }
         else
         {
             break;
         } 
     }  
    return result;
}

里面加入了一个cancellationToken,进行消费的时长限制。在此时长内消费多少条数据,超时直接结束。

这就是基本的代码

后台定时消费数据

代码语言:javascript
复制
public class HeartBeatService : BackgroundService
{
     private readonly HeartBeatsChannel heartBeatsChannel;

     public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
     {
         this.heartBeatsChannel = heartBeatsChannel;
     }

     protected override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
         try
         {

             Task.Factory.StartNew(() =>
             {
                 while (!stoppingToken.IsCancellationRequested)
                 {
                     //阻塞的队列使得一直在同一个线程运行
                     Process(15,heartBeatsChannel).Wait();
                 }

             }, TaskCreationOptions.LongRunning);

             Console.WriteLine("主线程 现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);

             }
         catch (Exception ex)
         {
             Console.WriteLine(ex.ToString());
         }
     }
     /// <summary>
     /// 消费数据
     /// </summary>
     /// <param name="count">一次消费数量</param>
     /// <param name="heartBeatsChannel"></param>
     /// <returns></returns>
     private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
     {
         Console.WriteLine("子线程_现在运行的线程id为:" + Thread.CurrentThread.ManagedThreadId);
         //每次消费三十个
         if (heartBeatsChannel.IsHasContent)
         {
             //int count = 15;
             //进行消费
             await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
         }           
         await Task.Delay(3000);
     }
}

使用的是BackgroundServic,直接实现要处理的业务逻辑就好了。

在这里使用的是TaskCreationOptions.LongRunning,新开一个线程去处理心跳数据。

总结

以上就是主要的实现全过程,完整的代码在github

https://github.com/lackguozi/LearnChannelWebApi

实际上完全可以不用后台去定时消费数据,channel有很多api可以去处理,比如WaitToReadAsync(),但是这里没有使用,主要是不想持续的占数据库资源?

总结的话学习了channel的用法,底层似乎使用了deque?只稍微看了下源码,但是看到了许多的lock,这个是必不可少的。还是巨硬轮子造的好

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DotNet NB 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列
腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档