前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >.NET Core开发实战(第35课:MediatR:让领域事件处理更加优雅)--学习笔记

.NET Core开发实战(第35课:MediatR:让领域事件处理更加优雅)--学习笔记

作者头像
郑子铭
发布2021-01-13 15:34:43
8300
发布2021-01-13 15:34:43
举报

35 | MediatR:让领域事件处理更加优雅

核心对象

IMediator

INotification

INotificationHandler

这两个与之前的 Request 的行为是不一样的,接下来看一下代码

代码语言:javascript
复制
internal class MyEvent : INotification
{
    public string EventName { get; set; }
}

internal class MyEventHandler : INotificationHandler<MyEvent>
{
    public Task Handle(MyEvent notification, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyEventHandler执行:{notification.EventName}");
        return Task.CompletedTask;
    }
}

internal class MyEventHandlerV2 : INotificationHandler<MyEvent>
{
    public Task Handle(MyEvent notification, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyEventHandlerV2执行:{notification.EventName}");
        return Task.CompletedTask;
    }
}
代码语言:javascript
复制
//await mediator.Send(new MyCommand { CommandName = "cmd01" });
await mediator.Publish(new MyEvent { EventName = "event01" });

之前 mediator 使用了 Send 的方式来处理 Command,它还有一个方法 Publish,这个方法的入参是一个 INotification

启动程序,输出如下:

代码语言:javascript
复制
MyEventHandler执行:event01
MyEventHandlerV2执行:event01

与之前的 IRequest 不同的是,INotification 是可以注册多个 Handler 的,它是一个一对多的关系,借助它就可以对领域事件定义多个处理器来处理

接着看一下之前云服务的代码

代码语言:javascript
复制
public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
{
    var result = await base.SaveChangesAsync(cancellationToken);
    await _mediator.DispatchDomainEventsAsync(this);
    return true;
}

之前在 IUnitOfWork 定义的时候讲过一个发送领域事件的方法 DispatchDomainEventsAsync,看一下这个方法的定义

代码语言:javascript
复制
static class MediatorExtension
{
    public static async Task DispatchDomainEventsAsync(this IMediator mediator, DbContext ctx)
    {
        var domainEntities = ctx.ChangeTracker
            .Entries<Entity>()
            .Where(x => x.Entity.DomainEvents != null && x.Entity.DomainEvents.Any());

        var domainEvents = domainEntities
            .SelectMany(x => x.Entity.DomainEvents)
            .ToList();

        domainEntities.ToList()
            .ForEach(entity => entity.Entity.ClearDomainEvents());

        foreach (var domainEvent in domainEvents)
            await mediator.Publish(domainEvent);
    }
}

可以看到这里是将所有的实体内的领域事件全部都查找出来,然后通过 mediator 的 Publish 发送领域事件,具体的领域事件的处理注册在 mediator 里面,这里定义了一个 OrderCreatedDomainEventHandler

代码语言:javascript
复制
public class OrderCreatedDomainEventHandler : IDomainEventHandler<OrderCreatedDomainEvent>
{
    ICapPublisher _capPublisher;
    public OrderCreatedDomainEventHandler(ICapPublisher capPublisher)
    {
        _capPublisher = capPublisher;
    }

    public async Task Handle(OrderCreatedDomainEvent notification, CancellationToken cancellationToken)
    {
        await _capPublisher.PublishAsync("OrderCreated", new OrderCreatedIntegrationEvent(notification.Order.Id));
    }
}

它继承自 IDomainEventHandler,而 IDomainEventHandler 继承自 INotificationHandler

代码语言:javascript
复制
public interface IDomainEventHandler<TDomainEvent> : INotificationHandler<TDomainEvent>
    where TDomainEvent : IDomainEvent
{
    //这里我们使用了INotificationHandler的Handle方法来作为处理方法的定义
    Task Handle(TDomainEvent domainEvent, CancellationToken cancellationToken);
}

这也就是为什么 IDomainEventHandler 会识别到 DomainEvent 并且进行处理,同样的在定义 DomainEvent 的时候,也需要标识它是一个 DomainEvent

代码语言:javascript
复制
public class OrderCreatedDomainEvent : IDomainEvent
{
    public Order Order { get; private set; }
    public OrderCreatedDomainEvent(Order order)
    {
        this.Order = order;
    }
}

而 DomainEvent 实际上也是继承自 INotification

代码语言:javascript
复制
public interface IDomainEvent : INotification
{
}

这也就意味着 EventHandler 可以正确的识别到对应的 Event 并且进行处理,这都是 MediatR 的核心能力

领域事件都是定义在 event 目录下,与领域模型定义在一起,所有的领域事件都继承 DomainEvent,分布于这个目录

领域事件的处理 Handler 都定义在 Application 应用层的 Application 下面的 DomainEventHandlers 目录下面

这样的好处是事件的定义与事件的处理是分开的,并且非常的明确知道有哪些领域事件,有哪些领域事件的处理程序

关于 MediatR 再补充一部分内容,在 TransactionBehavior 内可以看到这个类实际上继承自 IPipelineBehavior

代码语言:javascript
复制
namespace MediatR
{
    public interface IPipelineBehavior<in TRequest, TResponse>
    {
        Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next);
    }
}

这个接口的作用是在命令或者事件处理的之前或者之后插入逻辑,它的执行的方式有点像中间件的方式,在 Handler 的入参里面有一个 next 的参数,就是指 CommandHandler 或者 EventHandler 的执行的逻辑,在这里就可以决定 Handler 的具体执行之前或者之后,插入一些逻辑

代码语言:javascript
复制
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
    var response = default(TResponse);
    var typeName = request.GetGenericTypeName();

    try
    {
        // 首先判断当前是否有开启事务
        if (_dbContext.HasActiveTransaction)
        {
            return await next();
        }

        // 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略
        var strategy = _dbContext.Database.CreateExecutionStrategy();

        await strategy.ExecuteAsync(async () =>
        {
            Guid transactionId;
            using (var transaction = await _dbContext.BeginTransactionAsync())
            using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
            {
                _logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);

                response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式

                _logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);


                await _dbContext.CommitTransactionAsync(transaction);

                transactionId = transaction.TransactionId;
            }
        });

        return response;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);

        throw;
    }
}

这里实现里在执行命令之前判断事务是否开启,如果事务开启的话继续执行后面的逻辑,如果事务没有开启,先开启事务,再执行后面的逻辑

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DotNet NB 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 35 | MediatR:让领域事件处理更加优雅
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档