CAP带你轻松玩转Asp.Net Core消息队列

CAP是什么?

CAP是由我们园子里的杨晓东大神开发出来的一套分布式事务的决绝方案,是.Net Core Community中的第一个千星项目(目前已经1656 Star),具有轻量级、易使用、高性能等特点。

https://github.com/dotnetcore/CAP

本博客主要针对易用性这一点,展开叙述,一起看看CAP如何结合EF Core和RabbitMQ带领小白轻松走入分布式消息队列的世界。

准备

首先,你需要搭建一套RabbitMQ系统,搭建过程在此不再叙述,如果大家觉得麻烦,可以用我搭好的。

HostName: coderayu.cn  UserName:guest Password:guest  (仅仅可用作实验,数据丢失不负责)

创建Asp.Net Core 项目,并引入Nuget包

你可以运行以下下命令在你的项目中安装 CAP。

PM> Install-Package DotNetCore.CAP

如果你的消息队列使用的是 Kafka 的话,你可以:

PM> Install-Package DotNetCore.CAP.Kafka

如果你的消息队列使用的是 RabbitMQ 的话,你可以:

PM> Install-Package DotNetCore.CAP.RabbitMQ

CAP 提供了 Sql Server, MySql, PostgreSQL 的扩展作为数据库存储:

// 按需选择安装你正在使用的数据库
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql

创建DbContext

因为我采用的是EF Core,所以首先要创建一个DbContext上下文,代码如下:

public class CapDbContext:DbContext
    {
        public CapDbContext(DbContextOptions options) : base(options)
        {
        }
    }

Startup配置

首先需要在ConfigureServices函数中进行相关服务的注入,对应的操作和功能解释如下:

public void ConfigureServices(IServiceCollection services)
        {
            //注入DbContext上下文,如果用的是Mysql可能还需要添加Pomelo.EntityFrameworkCore.MySql这个Nuget包
            services.AddDbContext<CapDbContext>(options =>
                options.UseMySql("Server=127.0.0.1;Database=testcap;UserId=root;Password=123456;"));

            //配置CAP
            services.AddCap(x =>
            {
                x.UseEntityFramework<CapDbContext>();

                //启用操作面板
                x.UseDashboard();
                //使用RabbitMQ
                x.UseRabbitMQ(rb =>
                {
                    //rabbitmq服务器地址
                    rb.HostName = "coderayu.cn";

                    rb.UserName = "guest";
                    rb.Password = "guest";

                    //指定Topic exchange名称,不指定的话会用默认的
                    rb.ExchangeName = "cap.text.exchange";
                });

                //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。
                x.SucceedMessageExpiredAfter = 24 * 3600;

                //设置失败重试次数
                x.FailedRetryCount = 5;
            });

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
        }

最后还要再Congiure中启用CAP中间件

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            //启用cap中间件
            app.UseCap();

            app.UseMvc();
        }

利用EF Core生成CAP数据库

再程序包管理控制台中依此输入以下命令行

PM> Add-Migration Init
PM> update-database

如果成成功执行,那么打开数据库,就可以看到用来存储CAP发送和接收数据的表格了。

表格中每列的含义如下:

消息的发送和订阅

我们直接在ValuesController的基础上进行改造。

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 进行消息发送

private readonly ICapPublisher _publisher;

        public ValuesController(ICapPublisher publisher)
        {
            _publisher = publisher;
        }

发送消息

[HttpGet]
        public string Get(string message)
        {
            //"cap.test.queue"是发送的消息RouteKey,可以理解为消息管道的名称
            _publisher.Publish("cap.test.queue",message);

            return "发送成功";
        }

订阅消息

       //"cap.test.queue"为发送消息时的RauteKey,也可以模糊匹配
        //详情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
        [CapSubscribe("cap.test.queue")]
        public void HandleMessage(string message)
        {
            Console.Write(DateTime.Now.ToString()+"收到消息:"+message);
        }

Run

启动程序后,首先看到CAP启动成功

紧随其后,消费者也就是我们的订阅方法在RabbitMQ服务器上注册成功。

发送消息,发送成功,如下

发送后,立即在控制台看到了订阅方法输出的结果。

消息的失败重试

在订阅方法中,如果抛出异常,那么CAP就会认为该条消息处理失败,会自动进行重试,重试次数在前方已经进行了配置。

我们把订阅方法做一个改动,打印接收的信息到控制台中,并抛出异常

//"cap.test.queue"为发送消息时的RauteKey,也可以模糊匹配
        //详情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
        [CapSubscribe("cap.test.queue")]
        public void HandleMessage(string message)
        {
            Console.WriteLine(DateTime.Now.ToString()+"收到消息:"+message);
            throw new Exception("测试失败重试");
        }

可以看到,立即进行了三次重试

可是在前面,我们设置的失败重试次数是5次,为什么这里只重试三次吗?是不是要叫晓东过来改BUG了呢

?当然不是。

观察发现,CAP重试的前三次是立即进行的,而后面的重试,是每隔一段时间进行的,当在分布式通讯的过程中,可能出现了问题确实不会立即修复解决,可能过了一定时间,系统就自动恢复了,如网络抖动。

CAP仪表盘

发送成功了五条消息,成功接收处理了三条,两条处理失败,处理失败的任务,我们可以直接在面板中进行重新消费,可谓非常方便。

同时,处理失败的消息,点击消息的编号后,可以查看到消息的内容和异常原因。

CAP如此强大,让消息队列这种高大上产品操作So Easy,学会了CAP,也可以吹牛说,我也懂分布式任务处理啦

感谢晓东开发出如此强大的项目,同时感谢.Net Core Community。

参考 CAP Github wiki

https://github.com/dotnetcore/CAP/wiki

本博客Demo代码

https://github.com/liuzhenyulive/CAP.Demo

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

局域网与互联网环境下MTU的快速确定方法

MTU即:最大传输单元(Maximum Transmission Unit,MTU)是指一种通信协议的某一层上面所能通过的最大数据报大小(以字节为单位)。 想...

50270
来自专栏吴伟祥

Redis分布式锁的正确实现方式(Java版)

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis...

24420
来自专栏雨过天晴

转 PHP-redis编译成功

18830
来自专栏码农阿宇

CAP带你轻松玩转Asp.Net Core消息队列

CAP是由我们园子里的杨晓东大神开发出来的一套分布式事务的决绝方案,是.Net Core Community中的第一个千星项目(目前已经1656 Star),具...

23320
来自专栏Core Net

ASP.NET Core 2.0 : 三. 项目结构

47850
来自专栏数据结构笔记

scrapy爬虫框架(三):爬取壁纸保存并命名

首先我们先分析网页结构,打开网址:http://desk.zol.com.cn/dongman/1920x1080/

13520
来自专栏JavaEdge

Cookie常用API

1. JSP中Cookie的读写 Cookie的本质是一个键值对,当浏览器访问web服务器的时候写入在客户端机器上,里面记录一些信息。Cookie还有一些附加信...

37870
来自专栏逸鹏说道

Linux 部署ASP.NET SQLite 应用 的坎坷之旅 附demo及源码

Linux 部署ASP.NET SQLite 应用 的坎坷之旅。文章底部 附示例代码。 有一台闲置的Linux VPS,尝试着部署一下.NET 程序,结果就踏上...

41730
来自专栏DannyHoo的专栏

XML解析

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010105969/article/details/...

16920
来自专栏Bug生活2048

.net core项目实战之基于Restful API+Swagger项目搭建

然后右击你的项目,在属性中,勾选下生成XML文档文件,Swagger会自动解析对应的XML进行匹配。

14710

扫码关注云+社区

领取腾讯云代金券