前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅析分布式事务及解决方案

浅析分布式事务及解决方案

作者头像
山行AI
发布2019-10-08 16:14:02
5090
发布2019-10-08 16:14:02
举报
文章被收录于专栏:山行AI山行AI

在系统拆分,微服务化后,一个看似简单的功能,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。这里我们就简要的理一下关于分布式事务的处理。

分布式理论

分布式系统需要满足的CAP理论与Base理论

分布式系统的CAP理论

2000年7月,加州大学伯克利分校的Eric Brewer教授在ACM PODC会议上提出CAP猜想。2年后,麻省理工学院的Seth Gilbert和Nancy Lynch从理论上证明了CAP。之后,CAP理论正式成为分布式计算领域的公认定理。一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。

  • 数据一致性(consistency):如果系统对一个写操作返回成功,那么之后的读请求都必须读到这个新数据;如果返回失败,那么所有读操作都不能读到这个数据,对调用者而言数据具有强一致性(strong consistency) (又叫原子性 atomic、线性一致性 linearizable consistency)[5]
  • 服务可用性(availability):所有读写请求在一定时间内得到响应,可终止、不会一直等待
  • 分区容错性(partition-tolerance):在存在网络分区的情况下,被分隔的节点仍能正常对外服务

Base理论

eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(Strong Consistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。BASE是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。

  • 基本可用(Basically Available):基本可用是指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。电商大促时,为了应对访问量激增,部分用户可能会被引导到降级页面,服务层也可能只提供降级服务。这就是损失部分可用性的体现。
  • 软状态( Soft State):软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据至少会有三个副本,允许不同节点间副本同步的延时就是软状态的体现。mysql replication的异步复制也是一种体现。
  • 最终一致性( Eventual Consistency):最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。

ACID

  • 包括原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)、持久性(Durability)。
  • ACID是传统数据库常用的设计理念,追求强一致性模型。BASE支持的是大型分布式系统,提出通过牺牲强一致性获得高可用性。ACID和BASE代表了两种截然相反的设计哲学。在分布式系统设计的场景中,系统组件对一致性要求是不同的,因此ACID和BASE又会结合使用。

详细内容参考:https://nicky-chen.github.io/2018/04/25/cap-base-flp/

分布式事务

XA协议

XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器和本地资源管理器。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。XA是一个规范、协议,它只是定义了一系列的接口,只是目前大多数实现XA的都是数据库或者MQ,所以提起XA往往多指基于资源层的底层分布式事务解决方案。其实现在也有些数据分片框架或者中间件也支持XA协议,毕竟它的兼容性、普遍性更好。

传统XA

基于数据库的XA实现

介绍
  • 传统的XA是基于开启数据库的XA事务来实现的,XA事务又是通过二阶段提交(Two-phase Commit)实现的。
  • mysql在执行分布式事务(外部XA)的时候,mysql服务器相当于xa事务资源管理器,与mysql链接的客户端相当于事务管理器。
  • XA事务执行步骤,以mysql为例:
代码语言:javascript
复制
XA {START|BEGIN} xid [JOIN|RESUME]

XA END xid [SUSPEND [FOR MIGRATE]]

XA PREPARE xid

XA COMMIT xid [ONE PHASE]

XA ROLLBACK xid

XA RECOVER [CONVERT XID]

可见步骤很多,很复杂。

  • XA事务的明显问题是timeout问题,比如当一个RM(资源管理器)出问题了(例如数据库正好在做备份,这样数据库就会很慢),那么整个事务只能处于等待状态。这样可以会连锁反应,导致整个系统都很慢,最终不可用(连接耗尽)。
  • XA的性能很低。一个数据库的事务和多个数据库间的XA事务性能对比可发现,性能差10倍左右。因此要尽量避免XA事务,例如可以将数据写入本地,用高性能的消息系统分发数据。或使用数据库复制等技术。只有在这些都无法实现,且性能不是瓶颈时才应该使用XA。
  • XA又分为内部XA和外部XA等,这里不再展开介绍。
为什么不用
  • 性能(阻塞性协议,增加响应时间、锁时间、死锁)
  • 数据库支持完善度(MySQL 5.7之前都有缺陷)
  • 协调者依赖独立的J2EE中间件(早期重量级Weblogic、Jboss、后期轻量级Atomikos、Narayana和Bitronix)
  • 运维复杂,DBA缺少这方面经验
  • 并不是所有资源都支持XA协议
  • 大厂懂所以不使用,小公司不懂所以不敢用

mysql提供的XA:https://dev.mysql.com/doc/refman/5.7/en/xa.html。关于XA的详细介绍见:https://blog.csdn.net/soonfly/article/details/70677138

2PC和3PC

由XA协议衍生而来。为了解决这种分布式一致性问题,前人在性能和数据一致性的反反复复权衡过程中总结了许多典型的协议和算法。其中比较著名的有二阶提交协议(Two Phase Commitment Protocol)、三阶提交协议(Three Phase Commitment Protocol)和Paxos算法。

2PC

二阶段提交(Two-phaseCommit)是指,在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法(Algorithm)。通常,二阶段提交也被称为是一种协议(Protocol))。在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并 最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。

两阶段主要是指请求阶段和提交阶段:

请求阶段(commit-request phase,或称表决阶段,voting phase):

1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。

2)参与者节点执行询问发起为止的所有事务操作,并将Undo信息和Redo信息写入日志。(注意:若成功这里其实每个参与者已经执行了事务操作)

3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个”同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个”中止”消息。

提交阶段:

当协调者节点从所有参与者节点获得的相应消息都为”同意”时进行提交,提交的交互为:

1)协调者节点向所有参与者节点发出”正式提交(commit)”的请求。

2)参与者节点正式完成操作,并释放在整个事务期间内占用的资源。

3)参与者节点向协调者节点发送”完成”消息。

4)协调者节点受到所有参与者节点反馈的”完成”消息后,完成事务。

如果任一参与者节点在第一阶段返回的响应消息为”中止”,或者 协调者节点在第一阶段的询问超时之前无法获取所有参与者节点的响应消息时需要回滚,交互流程为:

1)协调者节点向所有参与者节点发出”回滚操作(rollback)”的请求。

2)参与者节点利用之前写入的Undo信息执行回滚,并释放在整个事务期间内占用的资源。

3)参与者节点向协调者节点发送”回滚完成”消息。

4)协调者节点受到所有参与者节点反馈的”回滚完成”消息后,取消事务。

2pc的缺点: 阶段提交存在着诸如同步阻塞、单点问题、脑裂等缺陷。

  • 同步阻塞问题:执行过程中,所有参与节点都是事务阻塞型的。当参与者占有公共资源时,其他第三方节点访问公共资源不得不处于阻塞状态。
  • 单点问题:由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)。
  • 脑裂:协调者再发出commit消息之后宕机,而唯一接收到这条消息的参与者同时也宕机了。那么即使协调者通过选举协议产生了新的协调者,这条事务的状态也是不确定的,没人知道事务是否被已经提交。
3PC

与2pc的异同点:

  • 三阶段提交(Three-phase commit),也叫三阶段提交协议(Three-phase commit protocol),是二阶段提交(2PC)的改进版本。
  • 与两阶段提交不同的是,三阶段提交是“非阻塞”协议。三阶段提交在两阶段提交的第一阶段与第二阶段之间插入了一个准备阶段,使得原先在两阶段提交中,参与者在投票之后,由于协调者发生崩溃或错误,而导致参与者处于无法知晓是否提交或者 中止的“不确定状态”所产生的可能相当长的延时的问题[1]得以解决。
  • 举例来说,假设有一个决策小组由一个主持人负责与多位组员以电话联络方式协调是否通过一个提案,以两阶段提交来说,主持人收到一个提案请求,打电话跟每个组员询问是否通过并统计回复, 然后将最后决定打电话通知各组员。要是主持人在跟第一位组员通完电话后失忆,而第一位组员在得知结果并执行后老人痴呆,那么即使重新选出主持人,也没人知道最后的提案决定是什么,也许是通过,也许是驳回,不管大家选择哪一种决定,都有可能与第一位组员已执行过的真实决定不一致, 老板就会不开心认为决策小组沟通有问题而解雇。三阶段提交即是引入了另一个步骤,主持人打电话跟组员通知请准备通过提案,以避免没人知道真实决定而造成决定不一致的失业危机。为什么能够解决二阶段提交的问题呢?回到刚刚提到的状况,在主持人通知完第一位组员请准备通过后两人意外失忆, 即使没人知道全体在第一阶段的决定为何,全体决策组员仍可以重新协调过程或直接否决,不会有不一致决定而失业。那么当主持人通知完全体组员请准备通过并得到大家的再次确定后进入第三阶段,当主持人通知第一位组员请通过提案后两人意外失忆,这时候其他组员再重新选出主持人后,仍可以知道目前至少是处于准备通过提案阶段, 表示第一阶段大家都已经决定要通过了,此时便可以直接通过。
  • 引入了超时机制,在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rebort请求时,会在等待超时之后,会继续进行事务的提交。

优缺点: 相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。

关于2pc和3pc的详细说明见:http://www.hollischuang.com/archives/681

saga、TCC与基于消息队列的最终一致性

基于数据库的XA协议本质上就是两阶段提交,但由于性能原因在互联网高并发场景下并不适用。如果数据库只能保证本地ACID时,那么其中出现交易异常后,如何实现整个交易原子性A,从而保证一致性C呢?另外在处理过程中如何保证隔离性呢?最直接的方法就是按照逻辑依次调用服务,但出现异常怎么办?那就对那些已经成功的进行补偿,补偿成功就一致了,这种朴素的模型就是Saga。但Saga这种方式并不能保证隔离性,于是出现了TCC。在实际交易逻辑前先做业务检查、对涉及到的业务资源进行“预留”,或者说是一种“中间状态”,如果都预留成功则完成这些预留资源的真正业务处理,典型的如票务座位等场景。

saga

Saga的执行顺序有两种:

代码语言:javascript
复制
T1, T2, T3, ..., Tn
T1, T2, ..., Tj, Cj,..., C2, C1,其中0 < j < n

Saga定义了两种恢复策略:

  • backward recovery,向后恢复,补偿所有已完成的事务,如果任一子事务失败。即上面提到的第二种执行顺序,其中j是发生错误的sub-transaction,这种做法的效果是撤销掉之前所有成功的sub-transation,使得整个Saga的执行结果撤销。
  • forward recovery,向前恢复,重试失败的事务,假设每个子事务最终都会成功。适用于必须要成功的场景,执行顺序是类似于这样的:T1, T2, ..., Tj(失败), Tj(重试),..., Tn,其中j是发生错误的sub-transaction。该情况下不需要Ci。显然,向前恢复没有必要提供补偿事务,如果你的业务中,子事务(最终)总会成功,或补偿事务难以定义或不可能,向前恢复更符合你的需求。理论上补偿事务永不失败,然而,在分布式世界中,服务器可能会宕机,网络可能会失败,甚至数据中心也可能会停电。在这种情况下我们能做些什么?最后的手段是提供回退措施,比如人工干预。

Saga的核心就是补偿,一阶段就是服务的正常顺序调用(数据库事务正常提交),如果都执行成功,则第二阶段则什么都不做;但如果其中有执行发生异常,则依次调用其补偿服务(一般多逆序调用未已执行服务的反交易)来保证整个交易的一致性。应用实施成本一般。

saga参考链接:https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf和https://www.jianshu.com/p/e4b662407c66?from=timeline&isappinstalled=0有兴趣的可以去学习一下,这里不展开描述。

TCC

TCC(Try-Confirm-Cancel)又称补偿事务。其核心思想是:"针对每个操作都要注册一个与其对应的确认和补偿(撤销操作)"。它分为三个操作:

  • Try阶段:主要是对业务系统做检测及资源预留。
  • Confirm阶段:确认执行业务操作。
  • Cancel阶段:取消执行业务操作。

TCC事务的处理流程与2PC两阶段提交类似,不过2PC通常都是在跨库的DB层面,而TCC本质上就是一个应用层面的2PC,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据库操作的粒度,使得降低锁冲突、提高吞吐量成为可能。

而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。为了满足一致性的要求,confirm和cancel接口还必须实现幂等。

TCC的特点在于业务资源检查与加锁,一阶段进行校验,资源锁定,如果第一阶段都成功,二阶段对锁定资源进行交易逻辑,否则,对锁定资源进行释放。应用实施成本较高。

基于消息的分布式事务处理

RocketMQ的最新版本(4.3.0+)中基于分布式事务消息的实现:

流程:
代码语言:javascript
复制
1. Producer send half message to MQ server.
2. After send half message succeed, execute local transaction.
3. Send commit or rollback message to MQ Server based on local transaction results.
4. If commit/rollback message missed or producer pended during the execution of local transaction,MQ server will send check message to each producers in the same group to obtain transaction status.
5. Producer reply commit/rollback message based on local transaction status.
6. Committed message will be delivered to consumer but rolled back message will be discarded by MQ server.

1.Producer 向Mq服务器 发送half message消息。

2.Mq服务器收到half message 并持久化成功之后,会向 Producer确认首次ACK,此时消息处于 HalfMessage 状态,并未发送给对应的Consumer。Producer 开始执行本地事务逻辑。

3. 根据事务执行结果,Producer 向Mq服务器提交二次确认(commit 或者 rollback)。Mq Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。

4. 在断网或者应用重启的情况下,二次ACK未成功的发给Mq Server,Mq Server会主动向 Producer 启动消息回查(Message Status Check),

5. Producer 根据事务执行结果,对消息回查返回对应的结果。

6. Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。

Half(Prepare) Message 是指暂时不能被投递的消息。Producer已经把消息发送给Mq 服务器,但是Mq服务器尚未收到生产者的第二次Ack,这个时候消息会被标记为"temporarily undeliverable",目前消息的状态为 HalfMessage。Message Status Check是指网络断开或者Producer应用重启会导致Mq服务器无法从Producer获取第二次ACK,当Mq服务器发现一个消息长时间处于 HalfMessage 状态时(默认为60S,可配置),它会主动请求Producer,查询消息Id对应的最新状态(commit 或者 rollback)。

事务消息
代码语言:javascript
复制
It can be thought of as a two-phase commit message implementation to ensure eventual consistency in distributed system. Transactional message ensures that the execution of local transaction and the sending of message can be performed atomically.

事务消息可以被看作是2PC消息的实现,用来保证分布式环境下的最终一致性。事务消息能够确保本地事务的执行,并且它发送消息的环节框架会自动操作。

使用限制:

代码语言:javascript
复制
(1) Messages of the transactional have no schedule and batch support.
(2) In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the “transactionCheckMax” parameter in the configuration of the broker, if one message has been checked over “transactionCheckMax” times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the “AbstractTransactionCheckListener” class.
(3) A transactional message will be checked after a certain period of time that determined by parameter “transactionTimeout” in the configuration of the broker. And users also can change this limit by set user property “CHECK_IMMUNITY_TIME_IN_SECONDS” when sending transactional message, this parameter takes precedence over the “transactionMsgTimeout” parameter.
(4) A transactional message maybe checked or consumed more than once.
(5) Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
(6) Producer IDs of transactional messages cannot be shared with producer IDs of other types of messages. Unlike other types of message, transactional messages allow backward queries. MQ Server query clients by their Producer IDs.

1. 事务消息不支持 delay 或者 batch操作。

2. 为了避免一个 Half Message的消息被检查多次 或者 消息积压,默认对每个消息最多进行15次消息回查,可以通过修改broker的 transactionCheckMax 参数来指定次数。如果一个Half Message状态的消息检查次数超过了 transactionCheckMax,默认情况下会直接丢弃掉并且打印错误日志,可以通过覆盖 AbstractTransactionCheckListener 类来修改这个行为。

3. 通过 transactionMsgTimeout参数可以指定消息回查(Message Status Check)间隔。

4. 事务消息可能会被check或者consume多次,要在Consumer端做好幂等控制。

5. 提交到目标topic的消息可能会失败。一般来说它依赖日志记录。它的高可用是依赖于RocketMQ自身的高可用来保证的。如果你想确保事务消息不会丢失并且让事务完整性得到保证,推荐你使用异步双写的机制。

6. 事务消息的生产者的id不能和其它类型消息的生产者id共享。不像其他类型的消息,事务消息允许倒过来的查询。MQ服务端通过生产者的id查询客户端。

应用:

1. 事务状态

代码语言:javascript
复制
Transactional status

There are three states for transactional message:
(1) TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message.
(2) TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume.
(3) TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status.

1. LocalTransactionState.UNKNOW : 中间状态,意味着Mq server需要稍候再次确认。

2. LocalTransactionState.COMMIT_MESSAGE: 事务完成,意味着消息可以投递给对应的 Consumer。

3. LocalTransactionState.ROLLBACK_MESSAGE: 事务失败,Mq Server会丢弃对应的事务消息,不会投递给对应的Consumer。

2. 开发中使用RocketMQ的分布式事务消息Consumer的代码不需要有什么特别的变化与普通消息Consumer代码一致就可以。

代码语言:javascript
复制
1. 从Mq server获取到消息之后,即开始处理本地事务,处理成功后返回 CONSUME_SUCCESS。

2. 处理失败则返回 RECONSUME_LATER,Mq server会在稍后重新投递这个消息,又进入步骤1。

Consumer 需要做好幂等控制,消息可能会被多次投递到Consumer。

3. 生产者

Use TransactionMQProducer class to create producer client, and specify a unique producerGroup, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result,and the reply status is described in the above section.

使用TransactionMQProducer类来创建生产者客户端,并且要指定一个唯一的生产者组,可以设置一个客户端线程池去执行确认请求。当执行了本地事务之后,你需要根据执行结果给MQ回复消息,回复的消息状态就是上面提到的状态。

代码语言:javascript
复制
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}
...

Implement the TransactionListener interface The “executeLocalTransaction” method is used to execute local transaction when send half message succeed. It returns one of three transaction status mentioned in the previous section. The “checkLocalTransaction” method is used to check the local transaction status and respond to MQ check requests. It also returns one of three transaction status mentioned in the previous section.

实现的TransactionListener接口有两个方法,executeLocalTransaction方法是在当half message发送成功后用来执行本地事务的。它返回三种事务状态中的一种。checkLocalTransaction用来 检查本地事务状态并且向MQ响应检查的请求。它也返回三种事务状态中的一种。

代码语言:javascript
复制
import ...

   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);

       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }

       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }

基于可靠消息最终一致,一阶段服务正常调用,同时同事务记录消息表,二阶段则进行消息的投递,消费。应用实施成本较低。

代码语言:javascript
复制
1. 实现:http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/

2. 示例:https://rocketmq.apache.org/docs/transaction-example/
DTP(Distributed Transaction Processing)模型

上面的事务框架,大多参考了DTP模型,DTP(Distributed Transaction Processing)参考链接:http://pubs.opengroup.org/onlinepubs/009680699/toc.pdf。

  • RM负责本地事务的提交,同时完成分支事务的注册、锁的判定,扮演事务参与者角色。
  • TM负责整体事务的提交与回滚的指令的触发,扮演事务的总体协调者角色。

不同框架在实现时,各组件角色的功能、部署形态会根据需求进行调整,例如TM有的是以jar包形式与应用部署在一起,有的则剥离出来需要单独部署(例如Seata中将TM的主要功能放到一个逻辑上集中的Server上,叫做TC( Transaction Coordinator ))

Seata架构

阿里分布式事务框架GTS开源了一个免费社区版Fescar,后来跟蚂蚁TCC方案整合后改名为Seata。参考链接:https://github.com/seata/seata/wiki Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。有些情况下,应用需要调用第三方系统的接口,而第三方系统没有接入GTS。此时需要用到GTS的MT模式。GTS的MT模式可以等价于TCC模式,用户可以根据自身业务需求自定义每个事务阶段的具体行为。MT模式提供了更多的灵活性,可能性,以达到特殊场景下的自定义优化及特殊功能的实现。

这里主要介绍下seata的AT模式和MT模式:

  • AT模式
代码语言:javascript
复制
前提:

基于支持本地 ACID 事务的关系型数据库。
Java 应用,通过 JDBC 访问数据库。

两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。

详细链接在:https://github.com/seata/seata/wiki/AT-Mode

  • MT模式:

回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:

代码语言:javascript
复制
一阶段 prepare 行为
二阶段 commit 或 rollback 行为
  • 区别: 根据两阶段行为模式的不同,我们将分支事务划分为 Automatic (Branch) Transaction Mode 和 Manual (Branch) Transaction Mode.

AT 模式基于 支持本地 ACID 事务 的 关系型数据库:

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,MT 模式,不依赖于底层数据资源的事务支持:

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。所谓 MT 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。详细链接在:https://github.com/seata/seata/wiki/MT-Mode

参考

  • https://www.cnblogs.com/barrywxx/p/8533363.html
  • http://www.sohu.com/a/336224977_673711
  • http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/
  • https://rocketmq.apache.org/docs/transaction-example/
  • https://github.com/seata/seata/wiki/AT-Mode
  • https://github.com/seata/seata/wiki
  • https://github.com/seata/seata/wiki/MT-Mode
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布式理论
    • 分布式系统的CAP理论
      • Base理论
        • ACID
        • 分布式事务
          • XA协议
            • 传统XA
            • 2PC和3PC
            • saga、TCC与基于消息队列的最终一致性
        • 参考
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档