前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MassTransit Get Started->

MassTransit Get Started->

作者头像
李明成
发布2020-09-17 15:53:46
1.4K0
发布2020-09-17 15:53:46
举报
文章被收录于专栏:dotNET知音dotNET知音

MassTransit:是一款.NET的分布式应用程序框架(开源、免费)。通过MassTransit,可以轻松创建利用基于消息的、松耦合异步通信的应用程序和服务,以提高可用性,可靠性和可伸缩性。

MassTransit本身定位轻量级的服务总线,并支持多种传输方式如:RabbitMQ、Azure Service Bus、ActiveMQ、Amazon SQS、Kafka、Azure Event Hub。消息异常处理:重试配置、重新交付、erro管道、死信管道。分布式事务处理:sagas、Courier。容器支持:.NETcore自身的、autofac、castle windsor等、调度支持:Quartz 、hangfire。更多功能参考官网文档。

MassTransit目前已经发布到了第7个版本了,7.0版本新增了对Kafka 的支持,构建仅支持.NET Standard 2.0...其他改动不大。MassTransit社区使用也是很活跃的,对于首次接触的,通过本篇文章(基于rabbitmq)帮你快速入门!

一个应用程序或服务可以使用两种不同的方法来生产消息,主要区别是sent需要指定具体的端点地址,而pub不需要,下面的代码会演示这两种方式。

  • 发布事件(多个接收者)
  • 发送命令(一个接收者)

发布事件(事件消息)

场景假设:在xx项目中,需要与第三方进行交互。比如:订单发货之后,把发货的信息的推送给第三方、把订单的状态变化也推送过去。我们分析下需求,系统要求在发货之后,需要做若干事情。可以解读为,发货这个动作已经发生了,需要做的事情不确定。这不是典型的发布订阅模式嘛!好了,那使用masstransit如何实现呢?

1.创建一个类库项目定义消息体,命名为contract
代码语言:javascript
复制
   public interface OrderShipped
    {
        public Guid OrderId { get; set; }//订单号
    }
2.创建一个api项目作为消息的生产方,命名为Delivery,然后安装nuget包:
代码语言:javascript
复制
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
代码语言:javascript
复制

            services.AddMassTransit(x =>
            {
                x.UsingRabbitMq((context, config) =>
                {

                    config.Host("rabbitmq://localhost:5672", hostConfig =>
                    {

                        hostConfig.Username("Amq");//填写你的用户名
                        hostConfig.Password("mq123");//填写你的用户名
                    });
                });
            });
            services.AddMassTransitHostedService();
在ValueController中,进行发布消息
代码语言:javascript
复制
        readonly IPublishEndpoint _publishEndpoint;
        public ValuesController(IPublishEndpoint publishEndpoint)
        {
            _publishEndpoint = publishEndpoint;
        }

        /// <summary>
        /// 测试发布
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        public async Task<ActionResult> OrderDelivery()
        {
            //其他
            await _publishEndpoint.Publish<OrderShipped>(new
            {
                OrderId = Guid.NewGuid()
            });

            return Ok();
        }

通过使用IPublishEndpoint实例的Publish方法,发布一个事件。

3.创建一个api项目作为消息的消费方,命名为Listener,然后安装nuget包:
代码语言:javascript
复制
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ
在Startup类的ConfigureServices中,添加以下配置
代码语言:javascript
复制
            services.AddMassTransit(x =>
            {
                x.AddConsumer<DeliveryExpressNotifyConsumer>();
                x.UsingRabbitMq((context, config) =>
                {
                    config.Host("rabbitmq://localhost:5672", hostConfig =>
                    {
                        hostConfig.Username("Amq");
                        hostConfig.Password("mq123");
                    });

                    config.ReceiveEndpoint("Order.Shipped", e =>
                    {
                        e.ConfigureConsumer<DeliveryExpressNotifyConsumer>(context);
                    });

                });
            });

            services.AddMassTransitHostedService();

实现接口IConsumer即可完成消费逻辑的过程。

代码语言:javascript
复制
    public class DeliveryExpressNotifyConsumer : IConsumer<OrderShipped>
    {
        ILogger<DeliveryExpressNotifyConsumer> _logger;

        public DeliveryExpressNotifyConsumer(ILogger<DeliveryExpressNotifyConsumer> logger)
        {
            _logger = logger;
        }
        public async Task Consume(ConsumeContext<OrderShipped> context)
        {
            _logger.LogInformation("订单id: {Value}已发货,通知第三方", context.Message.OrderId);
        }
    }
代码语言:javascript
复制
到此,消息生产方和消费方代码都已经实现了,运行一下,效果如下
发送消息(命令消息)

发送消息适用的场景,常常是一种命令,并且期望消息只被一个接收者或服务实例进行处理。masstransit使用发送消息和发布消息,在消息生产方不同之处,sent消息需要指定目标地址,使用ISendEndpoint的Send方法,消费者代码一样的配置。以下代码演示发送一个创建发货单的指令消息,比较简单直接贴出源码:

1.定义一个消息SubmitShippingOrder
代码语言:javascript
复制
 public class SubmitShippingOrder
    {
        public Guid OrderId { get; set; }
    }
2.sent消息
代码语言:javascript
复制
        readonly IPublishEndpoint _publishEndpoint;
        readonly ISendEndpointProvider _sendEndpointProvider;
        public ValuesController(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider)
        {
            _publishEndpoint = publishEndpoint;
            _sendEndpointProvider = sendEndpointProvider;
        }
        
        /// <summary>
        /// 测试发送
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        public async Task<ActionResult> Delivery()
        {
            var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:Submit.Shipping.Order")); //获取发送端点
            await endpoint.Send(new SubmitShippingOrder
            {
                OrderId = Guid.NewGuid()
            });
            return Ok();
        }
3.消费
代码语言:javascript
复制
  public class SubmitShippingOrderConsumer : IConsumer<SubmitShippingOrder>
    {
        ILogger<SubmitShippingOrderConsumer> _logger;

        public SubmitShippingOrderConsumer(ILogger<SubmitShippingOrderConsumer> logger)
        {
            _logger = logger;
        }
        public async Task Consume(ConsumeContext<SubmitShippingOrder> context)
        {
            _logger.LogInformation("创建发货单,订单id: {Value}", context.Message.OrderId);
        }
    }

运行一下效果:

官网文档:

http://masstransit-project.com/

https://masstransit-project.com/advanced/courier/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 dotNET知音 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 发布事件(事件消息)
    • 发送消息(命令消息)
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档