前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DDD落地之事件驱动模型

DDD落地之事件驱动模型

作者头像
柏炎
发布2022-08-23 14:35:01
9330
发布2022-08-23 14:35:01
举报

一.前言

hello,everyone。一日不见,如隔24小时。

周末的时候写了一文带你落地DDD,发现大家对于新的领域与知识都挺感兴趣的。后面将会出几篇DDD系列文章给大家介绍mvc迁移DDD实际要做的一些步骤。

DDD系列博客

  1. 一文带你落地DDD
  2. DDD落地之事件驱动模型
  3. DDD落地之仓储
  4. DDD落地之架构分层

DDD的理念中有一个是贯穿始终的,业务边界与解耦。我最开始不了解DDD的时候,我就觉得事件驱动模型能够非常好的解耦系统功能。当然,这个是我比较菜,在接触DDD之后才开始对事件驱动模型做深度应用与了解。其实无论是在spring的框架中还是在日常MVC代码的编写过程中,巧用事件驱动模型都能很好的提高代码的可维护性。

因此,本文将对DDD中使用事件驱动模型建立与踩坑做一个系统性的介绍。从应用层面出发,帮助大家更好的去进行架构迁移。

我的第一本掘金小册《深入浅出DDD》已经在掘金上线,欢迎大家试读~

DDD的微信群我也已经建好了,由于文章内不能放二维码,大家可以加我微信**baiyan_lou**,备注DDD交流,我拉你进群,欢迎交流共同进步。

二.事件驱动模型

2.1.为什么需要事件驱动模型

一个框架,一门技术,使用之前首先要清楚,什么样的业务场景需要使用这个东西。为什么要用跟怎么样把他用好更加重要。

假设我们现在有一个比较庞大的单体服务的订单系统,有下面一个业务需求:创建订单后,需要下发优惠券,给用户增长积分

先看一下,大多数同学在单体服务内的写法。【假设订单,优惠券,积分均为独立service】

代码语言:javascript
复制
//在orderService内部定义一个放下
@Transactional(rollbackFor = Exception.class)
public void createOrder(CreateOrderCommand command){
  //创建订单
  Long orderId = this.doCreate(command);
  //发送优惠券
  couponService.sendCoupon(command,orderId);
  //增长积分
  integralService.increase(command.getUserId,orderId);
}

上面这样的代码在线上运行会不会有问题?不会!

那为什么要改呢?

原因是,业务需求在不断迭代的过程中,与当前业务非强相关的主流程业务,随时都有可能被替换或者升级。

双11大促,用户下单的同时需要给每个用户赠送几个小礼品,那你又要写一个函数了,拼接在主方法的后面。双11结束,这段要代码要被注释。有一年大促,赠送的东西改变,代码又要加回来。。。。

来来回回的,订单逻辑变得又臭又长,注释的代码逻辑很多还不好阅读与理解。

如果用了事件驱动模型,那么当第一步创建订单成功之后,发布一个创建订单成功的领域事件。优惠券服务,积分服务,赠送礼品等等监听这个事件,对监听到的事件作出相应的处理。

事件驱动模型代码

代码语言:javascript
复制
//在orderService内部定义一个放下
@Transactional(rollbackFor = Exception.class)
public void createOrder(CreateOrderCommand command){
  //创建订单
  Long orderId = this.doCreate(command);
  publish(orderCreateEvent);
}
​
//各个需要监听的服务
public void handlerEvent(OrderCreateEvent event){
//逻辑处理
}

代码解耦,高度符合开闭原则

2.2.事件驱动模型选型

2.2.1.JDK中时间驱动机制

JDK为我们提供的事件驱动(EventListener、EventObject)、观察者模式(Observer)。

JDK不仅提供了Observable类、Observer接口支持观察者模式,而且也提供了EventObjectEventListener接口来支持事件监听模式。

观察者(Observer)相当于事件监听者(监听器) ,被观察者(Observable)相当于事件源和事件,执行逻辑时通知observer即可触发oberver的update,同时可传被观察者和参数。简化了事件-监听模式的实现

代码语言:javascript
复制
// 观察者,实现此接口即可
public interface Observer {
  
  /**
  * 当被观察的对象发生变化时候,这个方法会被调用
  * Observable o:被观察的对象
  * Object arg:传入的参数
  **/
  void update(Observable o, Object arg);
}
​
// 它是一个Class
public class Observable {
​
  // 是否变化,决定了后面是否调用update方法
  private boolean changed = false;
  
  // 用来存放所有`观察自己的对象`的引用,以便逐个调用update方法
  // 需要注意的是:1.8的jdk源码为Vector(线程安全的),有版本的源码是ArrayList的集合实现; 
  private Vector<Observer> obs;
​
  public Observable() {
  obs = new Vector<>();
  }
​
  public synchronized void addObserver(Observer o); //添加一个观察者 注意调用的是addElement方法,   添加到末尾   所以执行时是倒序执行的
  public synchronized void deleteObserver(Observer o);
  public synchronized void deleteObservers(); //删除所有的观察者
​
  // 循环调用所有的观察者的update方法
  public void notifyObservers();
  public void notifyObservers(Object arg);
  public synchronized int countObservers() {
  return obs.size();
  }
​
  // 修改changed的值
  protected synchronized void setChanged() {
    changed = true;
  }
  
  protected synchronized void clearChanged() {
    changed = false;
  }
  
  public synchronized boolean hasChanged() {
    return changed;
  }
}

内部观察者队列啥的都交给Observable去处理了, 并且,它是线程安全的。但是这种方式其实使用起来并不是那么的方便,没有一个消息总线,需要自己单独去维护观察者与被观察者。对于业务系统而言,还需要自己单独去维护每一个观察者的添加。

2.2.2.spring中的事件驱动机制

spring在4.2之后提供了@EventListener注解,让我们更便捷的使用监听。

了解过spring启动流程的同学都知道,Spring容器刷新的时候会发布ContextRefreshedEvent事件,因此若我们需要监听此事件,直接写个监听类即可。

代码语言:javascript
复制
@Slf4j
@Component
public class ApplicationRefreshedEventListener implements   ApplicationListener<ContextRefreshedEvent> {
​
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        //解析这个事件,做你想做的事,嘿嘿
    }
}

同样的我们也可以自己来定义一个事件,通过ApplicationEventPublisher发送。

代码语言:javascript
复制
/**
 * 领域事件基类
 *
 * @author baiyan
 * @date 2021/09/07
 */
@Getter
@Setter
@NoArgsConstructor
public abstract class BaseDomainEvent<T> implements Serializable {
​
    private static final long serialVersionUID = 1465328245048581896L;
​
    /**
     * 领域事件id
     */
    private String demandId;
​
    /**
     * 发生时间
     */
    private LocalDateTime occurredOn;
​
    /**
     * 领域事件数据
     */
    private T data;
​
    public BaseDomainEvent(String demandId, T data) {
        this.demandId = demandId;
        this.data = data;
        this.occurredOn = LocalDateTime.now();
    }
​
}

定义统一的业务总线发送事件

代码语言:javascript
复制
/**
 * 领域事件发布接口
 *
 * @author baiyan
 * @date  2021/09/07
 */
public interface DomainEventPublisher {
​
    /**
     * 发布事件
     *
     * @param event 领域事件
     */
    void publishEvent(BaseDomainEvent event);
​
}
代码语言:javascript
复制
/**
 * 领域事件发布实现类
 *
 * @author baiyan
 * @date  2021/09/07
 */
@Component
@Slf4j
public class DomainEventPublisherImpl implements DomainEventPublisher {
​
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
​
    @Override
    public void publishEvent(BaseDomainEvent event) {
        log.debug("发布事件,event:{}", event.toString());
        applicationEventPublisher.publishEvent(event);
    }
​
}

监听事件

代码语言:javascript
复制
@Component
@Slf4j
public class UserEventHandler {
​
    @EventListener
    public void handleEvent(DomainEvent event) {
       //doSomething
    }
​
}

芜湖,起飞~

相比较与JDK提供的观察者模型的事件驱动,spring提供的方式就是yyds。

2.3.事件驱动之事务管理

平时我们在完成某些数据的入库后,发布了一个事件。后续我们进行操作记录在es的记载,但是这时es可能集群响应超时了,操作记录入库失败报错。但是从业务逻辑上来看,操作记录的入库失败,不应该影响到主流程的逻辑执行,需要事务独立。亦或是,如果主流程执行出错了,那么我们需要触发一个事件,发送钉钉消息到群里进行线上业务监控,需要在主方法逻辑中抛出异常再调用此事件。这时,我们如果使用的是@EventListener,上述业务场景的实现就是比较麻烦的逻辑了。

为了解决上述问题,Spring为我们提供了两种方式:

(1)@TransactionalEventListener注解。

(2) 事务同步管理器TransactionSynchronizationManager

本文针对@TransactionalEventListener进行一下解析。

我们可以从命名上直接看出,它就是个EventListener,在Spring4.2+,有一种叫做@TransactionEventListener的方式,能够实现在控制事务的同时,完成对对事件的处理。

代码语言:javascript
复制
//被@EventListener标注,表示它能够监听事件
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
​
  //表示当前事件跟随消息发送方事务的出发时机,默认为消息发送方事务提交之后才进行处理。
   TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
​
   //true时不论发送方是否存在事务均出发当前事件处理逻辑
   boolean fallbackExecution() default false;
​
   //监听的事件具体类型,还是建议指定一下,避免监听到子类的一些情况出现
   @AliasFor(annotation = EventListener.class, attribute = "classes")
   Class<?>[] value() default {};
​
   //指向@EventListener对应的值
   @AliasFor(annotation = EventListener.class, attribute = "classes")
   Class<?>[] classes() default {};
​
   //指向@EventListener对应的值
   String condition() default "";
​
}
代码语言:javascript
复制
public enum TransactionPhase {
   // 指定目标方法在事务commit之前执行
   BEFORE_COMMIT,
​
   // 指定目标方法在事务commit之后执行
    AFTER_COMMIT,
​
    // 指定目标方法在事务rollback之后执行
    AFTER_ROLLBACK,
​
   // 指定目标方法在事务完成时执行,这里的完成是指无论事务是成功提交还是事务回滚了
   AFTER_COMPLETION
  }

我们知道,Spring的事件监听机制(发布订阅模型)实际上并不是异步的(默认情况下),而是同步的来将代码进行解耦。而@TransactionEventListener仍是通过这种方式,但是加入了回调的方式来解决,这样就能够在事务进行Commited,Rollback…等时候才去进行Event的处理,来达到事务同步的目的。

三.实践及踩坑

针对是事件驱动模型里面的@TransactionEventListener@EventListener假设两个业务场景。

新增用户,关联角色,增加关联角色赋权操作记录。

1.统一事务:上述三个操作事务一体,无论哪个发生异常,数据统一回滚。

2独立事务:上述三个操作事务独立,事件一旦发布,后续发生任意异常均不影响。

3.1.统一事务

用户新增

代码语言:javascript
复制
@Service
@Slf4j
public class UserServiceImpl implements UserService {
​
    @Autowired
    DomainEventPublisher domainEventPublisher;
​
    @Transactional(rollbackFor = Exception.class)
    public void createUser(){
        //省略非关键代码
        save(user);
        domainEventPublisher.publishEvent(userEvent);
    }
}

用户角色关联

代码语言:javascript
复制
@Component
@Slf4j
public class UserEventHandler {
​
    @Autowired
    DomainEventPublisher domainEventPublisher;
​
    @Autowired
    UserRoleService userRoleService;
​
    @EventListener
    public void handleEvent(UserEvent event) {
        log.info("接受到用户新增事件:"+event.toString());
        //省略部分数据组装与解析逻辑
        userRoleService.save(userRole);
        domainEventPublisher.publishEvent(userRoleEvent);
    }
​
}

用户角色操作记录

代码语言:javascript
复制
@Component
@Slf4j
public class UserRoleEventHandler {
​
    @Autowired
    UserRoleRecordService userRoleRecordService;
​
    @EventListener
    public void handleEvent(UserRoleEvent event) {
        log.info("接受到userRole事件:"+event.toString());
        //省略部分数据组装与解析逻辑
        userRoleRecordService.save(record);
    }
​
}

以上即为同一事务下的一个逻辑,任意方法内抛出异常,所有数据的插入逻辑都会回滚。

给出一下结论,@EventListener标注的方法是被加入在当前事务的执行逻辑里面的,与主方法事务一体。

踩坑1:

严格意义上来说这里不算是把主逻辑从业务中拆分出来了,还是在同步的事务中,当然这个也是有适配场景的,大家为了代码简洁性与函数级逻辑清晰可以这么做。但是这样做其实不是那么DDD,DDD中应用服务的一个方法即为一个用例,里面贯穿了主流程的逻辑,既然是当前系统内强一致性的业务,那就应该在一个应用服务中体现。当然这个是属于业务边界的。举例的场景来看,用户与赋权显然不是强一致性的操作,赋权失败,不应该影响我新增用户,所以这个场景下做DDD改造,不建议使用统一事务。

踩坑2:

listener里面的执行逻辑可能比较耗时,需要做异步化处理,在UserEventHandler方法上标注@Async,那么这里与主逻辑的方法事务就隔离开了,监听器内的事务开始独立,将不会影响到userService内的事务。例如其他代码不变的情况下用户角色服务代码修改如下

代码语言:javascript
复制
@Component
@Slf4j
public class UserEventHandler {
​
    @Autowired
    DomainEventPublisher domainEventPublisher;
​
    @Autowired
    UserRoleService userRoleService;
​
    @EventListener
    @Async
    public void handleEvent(UserEvent event) {
        log.info("接受到用户新增事件:"+event.toString());
        //省略部分数据组装与解析逻辑
        userRoleService.save(userRole);
        domainEventPublisher.publishEvent(userRoleEvent);
        throw new RuntimeException("制造一下异常");
    }
​
}

发现,用户新增了,用户角色关联关系新增了,但是操作记录没有新增。第一个结果好理解,第二个结果就奇怪了把,事件监听里面抛了异常,但是居然数据保存成功了。

这里其实是因为UserEventHandlerhandleEvent方法外层为嵌套@TransactionaluserRoleService.save操作结束,事务就提交了,后续的抛异常也不影响。为了保持事务一致,在方法上加一个@Transactional即可。

3.2.独立事务

@EventListener作为驱动加载业务分散代码管理还挺好的。但是在DDD层面,事务数据被杂糅在一起,除了问题一层层找也麻烦,而且数据捆绑较多,还是比较建议使用@TransactionalEventListene

用户新增

代码语言:javascript
复制
@Service
@Slf4j
public class UserServiceImpl implements UserService {
​
    @Autowired
    DomainEventPublisher domainEventPublisher;
​
    @Transactional(rollbackFor = Exception.class)
    public void createUser(){
        //省略非关键代码
        save(user);
        domainEventPublisher.publishEvent(userEvent);
    }
}

用户角色关联

代码语言:javascript
复制
@Component
@Slf4j
public class UserEventHandler {
​
    @Autowired
    DomainEventPublisher domainEventPublisher;
​
    @Autowired
    UserRoleService userRoleService;
​
    @TransactionalEventListener
    public void handleEvent(UserEvent event) {
        log.info("接受到用户新增事件:"+event.toString());
        //省略部分数据组装与解析逻辑
        userRoleService.save(userRole);
        domainEventPublisher.publishEvent(userRoleEvent);
    }
​
}

用户角色操作记录

代码语言:javascript
复制
@Component
@Slf4j
public class UserRoleEventHandler {
​
    @Autowired
    UserRoleRecordService userRoleRecordService;
​
    @TransactionalEventListener
    public void handleEvent(UserRoleEvent event) {
        log.info("接受到userRole事件:"+event.toString());
        //省略部分数据组装与解析逻辑
        userRoleRecordService.save(record);
    }
​
}

一样的代码,把注解从@EventListener更换为@TransactionalEventListener。执行之后发现了一个神奇的问题,用户角色操作记录数据没有入库!!!

捋一捋逻辑看看,换了个注解,就出现这个问题了,比较一下·两个注解的区别。 @TransactionalEventListener事务独立,且默认注解phase参数值为TransactionPhase.AFTER_COMMIT,即为主逻辑方法事务提交后在执行。而我们知道spring中事务的提交关键代码在AbstractPlatformTransactionManager.commitTransactionAfterReturning

代码语言:javascript
复制
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
   if (txInfo != null &amp;&amp; txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
         logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
      }
      //断点处
      txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
   }
}

配置文件中添加以下配置

代码语言:javascript
复制
logging:
  level:
    org:
      mybatis: debug

在上述代码的地方打上断点,再次执行逻辑。

发现,第一次userService保存数据进入此断点,然后进入到userRoleService.save逻辑,此处不进入断点,后续的操作记录的事件处理方法也没有进入。

在来看一下日志

代码语言:javascript
复制
- 2021-09-07 19:54:38.166, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
- 2021-09-07 19:54:38.166, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:38.167, DEBUG, [,,], [http-nio-8088-exec-6], o.m.s.t.SpringManagedTransaction - JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@1832a0d9] will be managed by Spring
- 2021-09-07 19:54:38.184, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.423, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@77a74846]
- 2021-09-07 19:54:51.430,  INFO, [,,], [http-nio-8088-exec-6], com.examp.event.demo.UserEventHandler - 接受到用户新增事件:com.examp.event.demo.UserEvent@385db2f9
- 2021-09-07 19:54:53.602, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Creating a new SqlSession
- 2021-09-07 19:54:53.602, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818] was not registered for synchronization because synchronization is not active
- 2021-09-07 19:54:53.603, DEBUG, [,,], [http-nio-8088-exec-6], o.m.s.t.SpringManagedTransaction - JDBC Connection [com.mysql.cj.jdbc.ConnectionImpl@1832a0d9] will be managed by Spring
- 2021-09-07 19:54:53.622, DEBUG, [,,], [http-nio-8088-exec-6], org.mybatis.spring.SqlSessionUtils - Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818]

注意看接受到用户新增事件之后的日志,SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@9af2818] was not registered for synchronization because synchronization is not active说明当前事件是无事务执行的逻辑。再回过头去看一下@TransactionalEventListener,默认配置是在事务提交后才进行事件执行的,但是这里事务都没有,自然也就不会触发事件了。

看图捋一下代码逻辑

那怎么解决上面的问题呢?

其实这个东西还是比较简单的:

1.可以对监听此事件的逻辑无脑标注@TransactionalEventListener(fallbackExecution = true),无论事件发送方是否有事务都会触发事件。

2.在第二个发布事件的上面标注一个@Transactional(propagation = Propagation.REQUIRES_NEW),切记不可直接标注@Transactional,这样因为userService上事务已经提交,而@Transactional默认事务传播机制为Propagation.REQUIRED,如果当前没有事务,就新建一个事务,如果已经存在一个事务,加入到这个事务中。

userService中的事务还存在,只是已经被提交,无法再加入,也就是会导致操作记录仍旧无法被插入。

将配置修改为

代码语言:javascript
复制
logging:
  level:
    org: debug

可以看到日志

代码语言:javascript
复制
- 2021-09-07 20:26:29.900, DEBUG, [,,], [http-nio-8088-exec-2], o.s.j.d.DataSourceTransactionManager - Cannot register Spring after-completion synchronization with existing transaction - processing Spring after-completion callbacks immediately, with outcome status 'unknown'

四.DDD中的事件驱动应用

理清楚spring中事件驱动模型之后,我们所要做的就是开始解耦业务逻辑。

通过事件风暴理清楚业务用例,设计完成聚合根【ps:其实我觉得设计聚合根是最难的,业务边界是需要团队成员达成共识的地方,不是研发说了算的】,划分好业务领域边界,将原先杂糅在service里面的各个逻辑根据聚合根进行:

  1. 对于聚合的每次命令操作,都至少一个领域事 件发布出去,表示操作的执行结果
  2. 每一个领域事件都将被保存到事件存储中
  3. 从资源库获取聚合时,将根据发生在聚合上的 事件来重建聚合,事件的重放顺序与其产生顺序相同
  4. 聚合快照:将聚合的某一事件发生时的状态快 照序列化存储下来。五.总结

本文着重介绍了事件驱动模型的概念与应用,并对实际可能出现的业务逻辑做了分析与避坑。最后对于DDD中如何进行以上事件驱动模型进行了分析。

当然我觉得到这里大家应该对事件模型有了一个清晰的认知了,但是对于DDD中应用还是有些模糊。千言万语汇成一句话:与聚合核心逻辑有关的,走应用服务编排,与核心逻辑无关的,走事件驱动模型,采用独立事务模式。至于数据一致性,就根据大家自己相关的业务来决定了,方法与踩坑都告诉了大家了。

你我都是架构师!!!

六.引用及参考

@TransactionalEventListener的使用和实现原理

【小家Spring】从Spring中的(ApplicationEvent)事件驱动机制出发,聊聊【观察者模式】【监听者模式】【发布订阅模式】【消息队列MQ】【EventSourcing】...

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一.前言
  • 二.事件驱动模型
    • 2.1.为什么需要事件驱动模型
      • 2.2.事件驱动模型选型
        • 2.2.1.JDK中时间驱动机制
        • 2.2.2.spring中的事件驱动机制
      • 2.3.事件驱动之事务管理
      • 三.实践及踩坑
        • 3.1.统一事务
          • 3.2.独立事务
          • 四.DDD中的事件驱动应用
          • 六.引用及参考
          相关产品与服务
          消息队列 CMQ
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档