eShopOnContainers 知多少[8]:Ordering microservice

1. 引言

Ordering microservice(订单微服务)就是处理订单的了,它与前面讲到的几个微服务相比要复杂的多。主要涉及以下业务逻辑:

  1. 订单的创建、取消、支付、发货
  2. 库存的扣减

2. 架构模式

如上图所示,该服务基于CQRS 和DDD来实现。

从项目结构来看,主要包括7个项目:

  1. Ordering.API:应用层
  2. Ordering.Domain:领域层
  3. Ordering.Infrastructure:基础设施层
  4. Ordering.BackgroundTasks:后台任务
  5. Ordering.SignalrHub:基于Signalr的消息推送和实时通信
  6. Ordering.FunctionalTests:功能测试项目
  7. Ordering.UnitTests:单元测试项目

从以上的项目定义来看,该微服务的设计并符合DDD经典的四层架构。

核心技术选型:

  1. ASP.NET Core Web API
  2. Entity Framework Core
  3. SQL Server
  4. Swashbuckle(可选)
  5. Autofac
  6. Eventbus
  7. MediatR
  8. SignalR
  9. Dapper
  10. Polly
  11. FluentValidator

3. 简明DDD

领域驱动设计是一种方法论,用于解决软件复杂度问题。它强调以领域为核心驱动设计。主要包括战略和战术设计两大部分,其中战略设计指导我们在宏观层面对问题域进行识别和划分,从而将大问题划分为多个小问题,分而治之。而战术设计从微观层面指导我们如何对领域进行建模。

其中战术设计了引入了很多核心要素,指导我们建模:

  1. 值对象(Value Object)
  2. 实体(Entity)
  3. 领域服务(Domain Service)
  4. 领域事件(Domain Event)
  5. 资源库(Repository)
  6. 工厂(Factory)
  7. 聚合(Aggregate)
  8. 应用服务(Application Service)

其中实体、值对象和领域服务用于表示领域模型,来实现领域逻辑。 聚合用于封装一到多个实体和值对象,确保业务完整性。 领域事件来丰富领域对象之间的交互。 工厂、资源库用于管理领域对象的生命周期。 应用服务是用来表达用例和用户故事。

有了以上的战术设计要素还不够,如果它们糅合在一起,还是会很混乱,因此DDD再通过分层架构来确保关注点分离,即将领域模型相关(实体、值对象、聚合、领域服务、领域事件)放到领域层,将资源库、工厂放到基础设施层,将应用服务放到应用层。以下就是DDD经典的四层架构:

以上相关图片来源于:张逸 · 领域驱动战略设计实践

4. Ordering.Domain:领域层

如果对订单微服务应用DDD,那么要摒弃传统的面向数据库建模的思想,转向领域建模。该项目中主要定义了以下领域对象:

  • Order:订单
  • OrderItem:订单项
  • OrderStatus:订单状态
  • Buyer:买家
  • Address:地址
  • PaymentMethod:支付方式
  • CardType:银行卡片类型

在该示例项目中,定义了两个聚合:订单聚合和买家聚合,其中Order和Buyer分属两个聚合根,其中订单聚合通过持有买家聚合的唯一ID进行关联。如下图所示:

我们依次来看其对实体、值对象、聚合、资源库、领域事件的实现方式。

4.1. 实体、值对象与聚合

实体与值对象最大的区别在于,实体有标识符可变,值对象不可变。为了保证领域的不变性,也就是更好的封装,所有的属性字段都设置为private set,集合都设置为只读的,通过构造函数进行初始化,通过暴露方法供外部调用修改。 从类图中我们可以看出,其主要定义了一个Entity抽象基类,所有的实体通过继承Entity来实现命名约定。这里面有两点需要说明:

  1. 通过Id属性确保唯一标识符
  2. 重写EqualsGetHashCode方法(hash值计算:this.Id.GetHashCode() ^ 31)
  3. 定义DomainEvents来存储实体关联的领域事件(领域事件的发生归根结底是由于领域对象的状态变化引起的,而领域对象[实体、值对象和聚合])中值对象是不可变的,而聚合往往包含多个实体,所以将领域事件关联在实体上最合适不过。)

同样,值对象也是通过继承抽象基类ValueObject来进行约定。其主要也是重载了EqualsGetHashCode和方法。这里面有必要学习其GetHashCode的实现技巧:

// ValueObject.cs
protected abstract IEnumerable<object> GetAtomicValues();
public override int GetHashCode()
{
    return GetAtomicValues()
     .Select(x => x != null ? x.GetHashCode() : 0)
     .Aggregate((x, y) => x ^ y);
}

//Address.cs
protected override IEnumerable<object> GetAtomicValues()
{
    // Using a yield return statement to return each ele
    yield return Street;
    yield return City;
    yield return State;
    yield return Country;
    yield return ZipCode;
}

可以看到,通过在基类定义GetAtomicValues方法,用来要求子类指定需要hash的字段,然后将每个字段取hash值,然后通过异或运算再行聚合得到唯一hash值。

所有对聚合中领域对象的操作都是通过聚合根来维护的。因此我们可以看到聚合根中定义了许多方法来处理领域逻辑。

4.2. 仓储

聚合中的领域对象的持久化借助仓储来完成的。其提供统一的入口来进行聚合内相关领域对象的CRUD,从而完成透明持久化。从图中看出,IRepository定义了一个IUnitOfWork属性,其代表工作单元,主要定义了两个方法SaveChangesAsyncSaveEntitiesAsync,借助事务一次性提交所有更改,以确保数据的完整性和有效性。

4.3. 领域事件

从类图中可以看出一个共同特征,都实现了INotification接口。对MediatR熟悉的肯定一眼就明白了。是的,这个是MediatR中定义的接口。借助MediatR,来实现事件处理管道。通过进程内事件处理管道来驱动命令接收,并将它们(在内存中)路由到正确的事件处理器。 关于MeidatR可以参考我的这篇博文:MediatR 知多少

而关于领域事件的处理,是通过继承INotificationHanlder接口来实现,这样INotificationINotificationHandler通过Ioc容器的服务注册,自动完成事件的订阅。而领域事件的处理其下放到了Ordering.Api中处理了。这里大家可能会有疑惑,既然叫领域事件,那为什么领域事件的处理不放到领域层呢?我们可以这样理解,事件是领域内触发,但对事件的处理,其并非都是业务逻辑的相关处理,比如订单创建成功后发送短信、邮件等就不属于业务逻辑。

eShopOnContainers中领域事件的触发时机并非是即时触发,选择的是延迟触发模式。具体的实现,后面会讲到。

5. Ordering.Infrastructure:基础设施层

基础设施层主要用于提供基础服务,主要是用来实体映射和持久化。

从图中可以看到,主要包含以下业务处理:

  1. 实体类型映射
  2. 幂等性控制器的实现
  3. 仓储的具体实现
  4. 数据库上下文的实现(UnitOfWork的实现)
  5. 领域事件的批量派发

这里着重下第2、4、5点的介绍。

5.1. 幂等性控制器

幂等性是指某个操作多次执行但结果相同,换句话说,多次执行操作而不改变结果。举例来说:我们在写预插脚本时,会添加条件判断,当表中不存在数据时才将数据插入到表中。无论重复运行多少次 SQL 语句,结果一定是相同的,并且结果数据会包含在表中。

那怎样确保幂等性呢?一种方式就是确保操作本身的幂等性,比如可以创建一个表示“将产品价格设置为¥25”而不是“将产品价格增加¥5”的事件。此时可以安全地处理第一条消息,无论处理多少次结果都一样,而第二个消息则完全不同。 但是假设价格是一个时刻在变的,而你当前的操作就是要将产品价格增加¥5怎么办呢?显然这个操作是不能重复执行的。那我如何确保当前的操作只执行一次呢? 一种简便的方法就是记录每次执行的操作。该项目中的Idempotency文件夹就是来做这件事的。

从类图来看很简单,就是每次发送事件时生成一个唯一的Guid,然后构造一个ClientRequest对象实例持久化到数据库中,每次借助MediatR发送消息时都去检测消息是否已经发送。

5.2. UnitOfWork(工作单元的实现)

从代码来看,主要干了两件事:

  1. 在提交变更之前,触发所有的领域事件
  2. 批量提交变更

这里需要解释的一点是,为什么要在持久化之前而不是之后进行领域事件的触发呢? 这种触发就是延迟触发,将领域事件的发布与领域实体的持久化放到一个事务中来达到一致性。 当然这有利有弊,弊端就是当领域事件的处理非常耗时,很有可能会导致事务超时,最终导致提交失败。而避免这一问题,也只有做事务拆分,这时就要考虑最终一致性和相应的补偿措施,显然更复杂。

至此,我们可以总结下聚合、仓储与数据库之间的关系,如下图所示。

6. Ordering.Api:应用层

应用层通过应用服务接口来暴露系统的全部功能。在这里主要涉及到:

  1. 领域事件的处理
  2. 集成事件的处理
  3. CQRS的实现
  4. 服务注册
  5. 认证授权
  6. 集成事件的订阅

6.1. 领域事件和集成事件

对于领域事件和集成事件的处理,我们需要先明白二者的区别。领域事件是发生在领域内的通信(同步或异步均可),而集成事件是基于多个微服务(其他限界上下文)甚至外部系统或应用间的异步通信。 领域事件是借助于MediatR的INotification 和 INotificationHandler的接口来实现。

其中Application/Behaviors文件夹中是实现MediatR中的IPipelineBehavior接口而定义的请求处理管道。

集成事件的发布订阅是借助事件总线来完成的,关于事件总线之前有文章详述,这里不再赘述。在此,仅代码举例其订阅方式。

private void ConfigureEventBus(IApplicationBuilder app)
{
    var eventBus = app.ApplicationServices.GetRequiredService<BuildingBlocks.EventBus.Abstractions.IEventBus>();

    eventBus.Subscribe<UserCheckoutAcceptedIntegrationEvent, IIntegrationEventHandler<UserCheckoutAcceptedIntegrationEvent>>();
// some other code
}

6.2. 基于MediatR实现的CQRS

CQRS(Command Query Responsibility Separation):命令查询职责分离。是一种用来实现数据模型读写分离的架构模式。顾名思义,分为两大职责:

  1. 命令职责
  2. 查询职责

其核心思想是:在客户端就将数据的新增修改删除等动作和查询进行分离,前者称为Command,通过Command Bus对领域模型进行操作,而查询则从另外一条路径直接对数据进行操作,比如报表输出等。

对于命令职责,其是借助于MediatR充当的CommandBus,使用IRequest来定义命令,使用IRequestHandler来定义命令处理程序。我们可以看下CancelOrderCommandCancelOrderCommandHandler的实现。

public class CancelOrderCommand : IRequest<bool>
{

    [DataMember]
    public int OrderNumber { get; private set; }

    public CancelOrderCommand(int orderNumber)
    {
        OrderNumber = orderNumber;
    }
}

public class CancelOrderCommandHandler : IRequestHandler<CancelOrderCommand, bool>
{
    private readonly IOrderRepository _orderRepository;

    public CancelOrderCommandHandler(IOrderRepository orderRepository)
    {
        _orderRepository = orderRepository;
    }

    public async Task<bool> Handle(CancelOrderCommand command, CancellationToken cancellationToken)
    {
        var orderToUpdate = await _orderRepository.GetAsync(command.OrderNumber);
        if(orderToUpdate == null)
        {
            return false;
        }

        orderToUpdate.SetCancelledStatus();
        return await _orderRepository.UnitOfWork.SaveEntitiesAsync();
    }
}

以上代码中,有一点需要指出,就是所有Command中的属性都定义为private set,通过构造函数进行赋值,以确保Command的不变性。

对于查询职责,通过定义查询接口,借助Dapper直接写SQL语句来完成对数据库的直接读取。

而对于定义的命令,为了确保每个命令的合法性,通过引入第三方Nuget包FluentValdiation来进行命令的合法性校验。其代码也很简单,参考下图。

6.3. 服务注册

整个订单微服务中所有服务的注册,都是放到应用层来做的,在Ordering.Api\Infrastructure\AutofacModules文件夹下通过继承Autofac.Module定义了两个Module来进行服务注册:

  • ApplicationModule:自定义接口相关服务的注册
  • MediatorModule:Mediator相关接口服务的注册

将所有的服务注册都放到高层模块来进行注册,有点违背关注点分离,各层应该关注本层的服务注册,所以这中实现方式是有待改进的。而具体如何改进,这里给大家提供一个线索,可参考ABP是如何实现进行服务注册的分离和整合的。

这里顺带提一下Autofac这个Ioc容器的一个限制,就是所有的服务注册必须在程序启动时完成注册,不允许运行时动态注册。

7. Ordering.BackgroundTasks:后台任务

后台任务,顾名思义,后台静默运行的任务,也称计划任务。在.NET Core 中,我们将这些类型的任务称为托管服务,因为它们是在主机/应用程序/微服务中托管的服务/逻辑。请注意,这种情况下托管服务仅简单表示具有后台任务逻辑类。

那我们如何实现托管服务了,一种简单的方式就是使用.NET Core 2.0之后版本中提供了一个名为IHostedService的新接口。当然也可以选择其他的一些后台任务框架,比如HangFire、Quartz。

该示例项目就是基于BackgroundService定义的一个后台任务。该任务主要用于轮询订单表中处于已提交超过1分钟的订单,然后发布集成事件到事件总线,最终用来将订单状态更新为待核验(库存)状态。

public abstract class BackgroundService : IHostedService, IDisposable
{
    protected BackgroundService();

    public virtual void Dispose();
    public virtual Task StartAsync(CancellationToken cancellationToken);
    [AsyncStateMachine(typeof(<StopAsync>d__4))]
    public virtual Task StopAsync(CancellationToken cancellationToken);
    protected abstract Task ExecuteAsync(CancellationToken stoppingToken);
}

BackgroundService的方法申明中我们可以看出仅需实现ExecuteAsync方法即可。

完成后台任务的定义后,将服务注册到Ioc容器中即可。

public IServiceProvider ConfigureServices(IServiceCollection services)
{
 //Other DI registrations;
 // Register Hosted Services
 services.AddSingleton<IHostedService, GracePeriodManagerService>();
 services.AddSingleton<IHostedService, MyHostedServiceB>();
 services.AddSingleton<IHostedService, MyHostedServiceC>();
 //...
}

总之,IHostedService接口为 ASP.NET Core Web 应用程序启动后台任务提供了一种便捷的方法。它的优势主要在于:当主机本身关闭时,可以利用取消令牌来优雅的清理后台任务。

8. Ordering.SignalrHub:即时通信

在订单微服务中,当订单状态变更时,需要实时推送订单状态变更消息给客户端。而这就涉及到实时通信。实时 HTTP 通信意味着,当数据可用时,服务端代码会推送内容到已连接的客户端,而不是服务端等待客户端来请求新数据。

而对于实时通信,ASP.NET Core中SignalR可以满足我们的需求,其支持几种处理实时通信的技术以确保实时通信的可靠传输。

该示例项目的实现思路很简单:

  1. 订阅订单状态变更相关的集成事件
  2. 继承SignalR.Hub定义一个NotificationsHub
  3. 在集成事件处理程序中调用Hub进行消息的实时推送
// 订阅集成事件
private void ConfigureEventBus(IApplicationBuilder app)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();  
    eventBus.Subscribe<OrderStatusChangedToAwaitingValidationIntegrationEvent, OrderStatusChangedToAwaitingValidationIntegrationEventHandler>();
    eventBus.Subscribe<OrderStatusChangedToPaidIntegrationEvent, OrderStatusChangedToPaidIntegrationEventHandler>();
    eventBus.Subscribe<OrderStatusChangedToStockConfirmedIntegrationEvent, OrderStatusChangedToStockConfirmedIntegrationEventHandler>();
    eventBus.Subscribe<OrderStatusChangedToShippedIntegrationEvent, OrderStatusChangedToShippedIntegrationEventHandler>();
    eventBus.Subscribe<OrderStatusChangedToCancelledIntegrationEvent, OrderStatusChangedToCancelledIntegrationEventHandler>();
    eventBus.Subscribe<OrderStatusChangedToSubmittedIntegrationEvent, OrderStatusChangedToSubmittedIntegrationEventHandler>();  
}

// 定义SignalR.Hub
[Authorize]
public class NotificationsHub : Hub
{

    public override async Task OnConnectedAsync()
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, Context.User.Identity.Name);
        await base.OnConnectedAsync();
    }

    public override async Task OnDisconnectedAsync(Exception ex)
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, Context.User.Identity.Name);
        await base.OnDisconnectedAsync(ex);
    }
}

// 在集成事件处理器中调用Hub进行消息的实时推送
public class OrderStatusChangedToPaidIntegrationEventHandler : IIntegrationEventHandler<OrderStatusChangedToPaidIntegrationEvent>
{
    private readonly IHubContext<NotificationsHub> _hubContext;

    public OrderStatusChangedToPaidIntegrationEventHandler(IHubContext<NotificationsHub> hubContext)
    {
        _hubContext = hubContext ?? throw new ArgumentNullException(nameof(hubContext));
    }

    public async Task Handle(OrderStatusChangedToPaidIntegrationEvent @event)
    {
        await _hubContext.Clients
            .Group(@event.BuyerName)
            .SendAsync("UpdatedOrderState", new { OrderId = @event.OrderId, Status = @event.OrderStatus });
    }
}

8. 最后

订单微服务在整个eShopOnContainers中属于最复杂的一个微服务了。 通过对DDD的简要介绍,以及对每一层的技术选型以及实现的思路和逻辑的梳理,希望对你有所帮助。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券