专栏首页领域驱动设计DDD实战进阶微服务实战(二):落地微服务架构到直销系统(构建消息总线框架接口)

微服务实战(二):落地微服务架构到直销系统(构建消息总线框架接口)

从上一篇文章大家可以看出,实现一个自己的消息总线框架是非常重要的内容,消息总线可以将界限上下文之间进行解耦,也可以为大并发访问提供必要的支持。

消息总线的作用:

1.界限上下文解耦:在DDD第一波文章中,当更新了订单信息后,我们通过调用经销商界限上下文的领域模型和仓储,进行了经销商信息的更新,这造成了耦合。通过一个消息总线,可以在订单界限上下文的WebApi服务(来源微服务-生产者)更新了订单信息后,发布一个事件消息到消息总线的某个队列中,经销商界限上下文的WebApi服务(消费者)订阅这个事件消息,然后交给自己的Handler进行消息处理,更新自己的经销商信息。这样就实现了订单界限上下文与经销商界限上下文解耦。

2.大并发支持:可以通过消息总线进一步提升下单的性能。我们可以将用户下单的操作直接交给一个下单命令WebApi接收,下单命令WebApi接收到命令后,直接丢给一个消息总线的队列,然后立即给前端返回下单结果。这样用户就不用等待后续的复杂订单业务逻辑,加快速度。后续订单的一系列处理交给消息的Handler进行后续的处理与消息的进一步投递。

消息总线设计重点:

1.定义消息(事件)的接口:所有需要投递与处理的消息,都从这个消息接口继承,因为需要约束消息中必须包含的内容,比如消息的ID、消息产生的时间等。

public interface IEvent
    {
        Guid Id { get; set; }
        DateTime CreateDate { get; set; }
    }

2.定义消息(事件)处理器接口:当消息投递到消息总线队列中后,一定有消费者WebApi接收并处理这个消息,具体的处理方法逻辑在订阅方处理器中实现,这里先需要定义处理器的接口,便于在消息总线框架中使用。

public interface IEventHandler
    {
        Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
    }

从上面代码可以看出,消息(事件)处理器处理的类型就是从IEvent接口继承的消息类。

3.定义消息(事件)与消息(事件)处理器关联接口:一种类型的消息被投递后,一定要在订阅方找到这种消息的处理器进行处理,所以一定要定义二者的关联接口,这样才能将消息与消息处理器对应起来,才能实现消息被订阅后的处理。

 public interface IEventHandlerExecutionContext
    {
        void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
            where TEventHandler : IEventHandler;
        bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
            where TEventHandler : IEventHandler;
        Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
    }

RegisterEventHandler方法就是建立消息与消息处理器的关联,这个方法其实是在订阅方使用,订阅方告诉消息总线,什么样的消息应该交给我的哪个处理器进行处理。

IsRegisterEventHandler方法是判断消息与处理器之间是否已经存在关联。

HandleAsync方法是通过查找到消息对应的处理器后,然后调用处理器自己的Handle方法进行消息的处理.

4.定义消息发布、订阅与消息总线接口:消息总线至少要支持两个功能,一个是生产者能够发布消息到我的消息总线,另一个是订阅方需要能够从我这个消息总线订阅消息。

 public interface IEventPublisher
    {
        void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
    }

从上面代码可以看出,生产者发布的消息仍然要从IEvent继承的类型。

 public interface IEventSubscriber
    {
        void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent
            where TEventHandler : IEventHandler;
    }

上面代码是订阅方用于从消息总线订阅消息,从代码中可以看出,它的最终的实现其实就是建立消息与处理器之间的关联。

 public interface IEventBus:IEventPublisher,IEventSubscriber
    {
    }

消息(事件)总线从两个接口继承下来,同时支持消息的发布与消息的订阅。

5.实现事件基类:上面已经订阅了消息(事件)的接口,这里来实现事件的基类,其实就是实现消息ID与产生的时间:

  public class BaseEvent : IEvent
    {
        public Guid Id { get; set; }
        public DateTime CreateDate { get; set; }
        public BaseEvent()
        {
            this.Id = Guid.NewGuid();
            this.CreateDate = DateTime.Now;
        }
    }

6.实现消息总线基类:消息总线底层的依赖可以是各种消息代理产品,比如RabbitMq、Kafaka或第三方云平台提供的消息代理产品,通常我们要封装这些消息代理产品。在封装之前,我们需要定义顶层的消息总线基类实现,主要的目的是未来依赖于它的具体实现可替换,另外也将消息与消息处理器的关联接口传递进来,便于订阅方使用。

public abstract class BaseEventBus : IEventBus
    {
        protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext;
        protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
        {
            this.eventHandlerExecutionContext = eventHandlerExecutionContext;
        }
        public abstract void Publish<TEvent>(TEvent @event)
            where TEvent : IEvent;
        public abstract void Subscribe<TEvent, TEventHandler>()
            where TEvent : IEvent
            where TEventHandler : IEventHandler;
    }

7.实现消息与处理器关联:消息必须与处理器关联,订阅方收到特定类型的消息后,才知道交给哪个处理器处理。

 public class EventHandlerExecutionContext : IEventHandlerExecutionContext
    {
        private readonly IServiceCollection registry;
        private readonly IServiceProvider serviceprovider;
        private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>();
        public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection,
            IServiceProvider> serviceProviderFactory = null)
        {
            this.registry = registry;
            this.serviceprovider = this.registry.BuildServiceProvider();
        }

       //查找消息关联的处理器,然后调用处理器的处理方法
        public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
        {
            var eventtype = @event.GetType();
            if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0)
            {
                using(var childscope = this.serviceprovider.CreateScope())
                {
                    foreach(var handlertype in handlertypes)
                    {
                        var handler = Activator.CreateInstance(handlertype) as IEventHandler;
                        await handler.HandleAsync(@event);
                    }
                }
            }
        }

       //判断消息与处理器之间是否有关联
        public bool IsRegisterEventHandler<TEvent, TEventHandler>()
            where TEvent : IEvent
            where TEventHandler : IEventHandler
        {
            if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist))
            {
                return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler));
            }
            return false;
        }
       
      //将消息与处理器关联起来,可以在内存中建立关联,也可以建立在数据库单独表中
        public void RegisterEventHandler<TEvent, TEventHandler>()
            where TEvent : IEvent
            where TEventHandler : IEventHandler
        {
            Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations);
        }
    }

上面我们基本上就将消息总线的架子搭建起来了,也实现了基本的功能,下一章我们基于它来实现RabbitMq的消息总线。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • DDD实战进阶第一波(六):开发一般业务的大健康行业直销系统(实现产品上下文仓储与应用服务层)

    用户1910585
  • DDD实战进阶第一波(六):开发一般业务的大健康行业直销系统(实现产品上下文仓储与应用服务层)

    前一篇文章我们完成了产品上下文的领域层,我们已经有了关于产品方面的简单领域逻辑,我们接着来实现产品上下文关于仓储持久化与应用层的用例如何来协调 领域逻辑与仓储持...

    用户1910585
  • 领域驱动设计之关联设计

    用户1910585
  • golang-101-hacks(22)——Types

    Go语言中的数据类型可分为两类:已命名和未命名。除了预先已声明的类型(如“int”、“rune”等),还可以自己定义命名类型。例如:

    羊羽shine
  • 28个Java开发常用规范技巧总结

    例如:UserService,但是以下情景例外:DO / BO / PO / DTO / VO。

    用户1516716
  • 让函数的导数可视化

    WolframChina
  • 从一个LocalDateTime引发的疑问

    公司有同事部署出错,然后查日志,找时间,从k8s得到的时间是  2017-06-16T09:38:48.580 +0000,然后他就纳闷了,因为他根本不会在9点...

    ydymz
  • Spring Boot 定义系统启动任务,你会几种方式?

    在 Servlet/Jsp 项目中,如果涉及到系统任务,例如在项目启动阶段要做一些数据初始化操作,这些操作有一个共同的特点,只在项目启动时进行,以后都不再执行,...

    江南一点雨
  • 线性表——数组描述(Qt5.1测试代码 for windows)

    青木
  • Spring Boot2 系列教程(十五)定义系统启动任务的两种方式

    在 Servlet/Jsp 项目中,如果涉及到系统任务,例如在项目启动阶段要做一些数据初始化操作,这些操作有一个共同的特点,只在项目启动时进行,以后都不再执行,...

    江南一点雨

扫码关注云+社区

领取腾讯云代金券