分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。例如在大型电商系统中,下单接口通常会扣减库存、减去优惠、生成订单 id, 而订单服务与库存、优惠、订单 id 都是不同的服务,下单接口的成功与否,不仅取决于本地的 db 操作,而且依赖第三方系统的结果,这时候分布式事务就保证这些操作要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
目前解决分布式事物的解决方案有seata,lcn 等。
RocketMQ 分布式事物实现
RocketMQ提供了事务消息的功能,采用2PC(两段式协议)+补偿机制(事务回查)的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。
首先,我们要知道什么是半事物消息和消息回查:
事务消息发送步骤如下:
事务消息回查步骤如下:
总体而言RocketMQ事务消息分为两条主线:
源码相关
在本地应用发送事务消息的核心类是TransactionMQProducer,该类通过继承DefaultMQProducer来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction方法
这里的transactionListener就是上面所说的消息回查的类,它提供了2个方法:
接着看DefaultMQProducer.sendMessageInTransaction()方法:
该方法主要做了以下事情
Producer 半事务消息发送成功后,会调用transactionListener.executeLocalTransaction方法执行本地事务。只有半消息发送成功后,才会执行本地事务,如果半消息发送失败,则设置回滚。
本地事务执行后,则调用this.endTransaction()方法,根据本地事务执行状态,去提交事务或者回滚事务。 如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息
Broker端通过SendMessageProcessor.processRequest()方法接收处理 Producer 发送的消息 最后会调用到SendMessageProcessor.sendMessage(),判断消息类型,进行消息存储。
存储半消息
代码 prepareMessage(msgInner) :
在这一步,备份消息的原主题名称与原队列ID,然后取消事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。与其他普通消息区分开,然后完成消息持久化。 到这里,Broker 就初步处理完了 Producer 发送的事务半消息。
两段式协议发送与提交回滚消息,执行完本地事务消息的状态为UNKNOW时,结束事务不做任何操作。通过事务状态定时回查得到发送端的事务状态是rollback或commit。 通过TransactionalMessageCheckService线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。
当Producer或者回查定时任务提交/回滚事务的时候,Broker如何处理事务消息提交、回滚命令的?其核心实现如下:
整体实现流程
如果消费端消费失败了怎么办?
如果有消息消费失败了,则将失败的消息回传给broker,即重新写入commitLog文件,消费者将重新消费;如果消息回传的时候,consumer和broker之间网络断开,则consumer会调用submitConsumeRequestLater()方法,在consumer端进行重新消费,如果仍然消费失败,会不断重试直到达到默认的16次,你可以使用msg.getReconsumeTimes()方法来获取当前重试次数,如果重试次数足够多之后仍然无法消费成功,必须通过工单、日志等方式进行人工干预以让producer事务进行回退处理。
可能由于网络或者mq故障,导致 Producer 订单系统 发送半消息(prepare)失败。 这时订单系统可以执行回滚操作,比如“订单关闭”等,走逆向流程退款给用户。
如果订单系统发送的半消息成功了,但是执行本地事务失败了,如更新订单状态为“已完成”。 这种情况下,执行本地事务失败后,会返回rollback给 MQ,MQ会删除之前发送的半消息。 也就不会调用优惠券系统了。
假如订单系统发送半消息成功后,没有收到MQ返回的响应。 这个时候可能是因为网络问题,或者其他异常报错,订单系统误以为发送MQ半消息失败,执行了逆向回滚流程。 但这个时候其实mq已经保存半消息成功了,那这个消息怎么处理? 这个时候MQ的后台消息回查定时任务TransactionalMessageCheckService会每隔1分钟扫描一次半消息队列,判断是否需要消息回查,然后回查订单系统的本地事务,这时MQ就会发现订单已经变成“已关闭”,此时就要发送rollback请求给mq,删除之前的半消息。
这个其实也是通过定时任务TransactionalMessageCheckService,它会发现这个消息超过一定时间还没有进行二阶段处理,就会回查本地事务。
小结
消息队列RocketMQ分布式事务消息不仅可以实现应用之间的解耦,又能保证数据的最终一致性。同时,传统的大事务可以被拆分为小事务,不仅能提升效率,还不会因为某一个关联应用的不可用导致整体回滚,从而最大限度保证核心系统的可用性。在极端情况下,如果关联的某一个应用始终无法处理成功,也只需对当前应用进行补偿或数据订正处理,而无需对整体业务进行回滚。
从RocketMQ事务型消息链路体现了面向失败的设计思路,也体现了事务型系统的严谨性,在第二阶段的消息没有送达的时候,broker会主动请求producer端去做check,producer做完check后会将事务的状态再次返回。虽然说实现最终一致的方案有很多,但是事务型消息是比较优雅实现方式之一。