在系统拆分,微服务化后,一个看似简单的功能,内部可能需要调用多个服务并操作多个数据库实现,服务调用的分布式事务问题变的非常突出。这里我们就简要的理一下关于分布式事务的处理。
分布式系统需要满足的CAP理论与Base理论
2000年7月,加州大学伯克利分校的Eric Brewer教授在ACM PODC会议上提出CAP猜想。2年后,麻省理工学院的Seth Gilbert和Nancy Lynch从理论上证明了CAP。之后,CAP理论正式成为分布式计算领域的公认定理。一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。
eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性(Strong Consistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency)。BASE是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。
详细内容参考:https://nicky-chen.github.io/2018/04/25/cap-base-flp/
XA是一个分布式事务协议,由Tuxedo提出。XA中大致分为两部分:事务管理器和本地资源管理器。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。XA是一个规范、协议,它只是定义了一系列的接口,只是目前大多数实现XA的都是数据库或者MQ,所以提起XA往往多指基于资源层的底层分布式事务解决方案。其实现在也有些数据分片框架或者中间件也支持XA协议,毕竟它的兼容性、普遍性更好。
基于数据库的XA实现
XA {START|BEGIN} xid [JOIN|RESUME]
XA END xid [SUSPEND [FOR MIGRATE]]
XA PREPARE xid
XA COMMIT xid [ONE PHASE]
XA ROLLBACK xid
XA RECOVER [CONVERT XID]
可见步骤很多,很复杂。
mysql提供的XA:https://dev.mysql.com/doc/refman/5.7/en/xa.html。关于XA的详细介绍见:https://blog.csdn.net/soonfly/article/details/70677138
由XA协议衍生而来。为了解决这种分布式一致性问题,前人在性能和数据一致性的反反复复权衡过程中总结了许多典型的协议和算法。其中比较著名的有二阶提交协议(Two Phase Commitment Protocol)、三阶提交协议(Three Phase Commitment Protocol)和Paxos算法。
二阶段提交(Two-phaseCommit)是指,在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法(Algorithm)。通常,二阶段提交也被称为是一种协议(Protocol))。在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并 最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。
两阶段主要是指请求阶段和提交阶段:
1)协调者节点向所有参与者节点询问是否可以执行提交操作(vote),并开始等待各参与者节点的响应。
2)参与者节点执行询问发起为止的所有事务操作,并将Undo信息和Redo信息写入日志。(注意:若成功这里其实每个参与者已经执行了事务操作)
3)各参与者节点响应协调者节点发起的询问。如果参与者节点的事务操作实际执行成功,则它返回一个”同意”消息;如果参与者节点的事务操作实际执行失败,则它返回一个”中止”消息。
当协调者节点从所有参与者节点获得的相应消息都为”同意”时进行提交,提交的交互为:
1)协调者节点向所有参与者节点发出”正式提交(commit)”的请求。
2)参与者节点正式完成操作,并释放在整个事务期间内占用的资源。
3)参与者节点向协调者节点发送”完成”消息。
4)协调者节点受到所有参与者节点反馈的”完成”消息后,完成事务。
如果任一参与者节点在第一阶段返回的响应消息为”中止”,或者 协调者节点在第一阶段的询问超时之前无法获取所有参与者节点的响应消息时需要回滚,交互流程为:
1)协调者节点向所有参与者节点发出”回滚操作(rollback)”的请求。
2)参与者节点利用之前写入的Undo信息执行回滚,并释放在整个事务期间内占用的资源。
3)参与者节点向协调者节点发送”回滚完成”消息。
4)协调者节点受到所有参与者节点反馈的”回滚完成”消息后,取消事务。
2pc的缺点: 阶段提交存在着诸如同步阻塞、单点问题、脑裂等缺陷。
与2pc的异同点:
优缺点: 相对于2PC,3PC主要解决的单点故障问题,并减少阻塞,因为一旦参与者无法及时收到来自协调者的信息之后,他会默认执行commit。而不会一直持有事务资源并处于阻塞状态。但是这种机制也会导致数据一致性问题,因为,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。
关于2pc和3pc的详细说明见:http://www.hollischuang.com/archives/681
基于数据库的XA协议本质上就是两阶段提交,但由于性能原因在互联网高并发场景下并不适用。如果数据库只能保证本地ACID时,那么其中出现交易异常后,如何实现整个交易原子性A,从而保证一致性C呢?另外在处理过程中如何保证隔离性呢?最直接的方法就是按照逻辑依次调用服务,但出现异常怎么办?那就对那些已经成功的进行补偿,补偿成功就一致了,这种朴素的模型就是Saga。但Saga这种方式并不能保证隔离性,于是出现了TCC。在实际交易逻辑前先做业务检查、对涉及到的业务资源进行“预留”,或者说是一种“中间状态”,如果都预留成功则完成这些预留资源的真正业务处理,典型的如票务座位等场景。
Saga的执行顺序有两种:
T1, T2, T3, ..., Tn
T1, T2, ..., Tj, Cj,..., C2, C1,其中0 < j < n
Saga定义了两种恢复策略:
Saga的核心就是补偿,一阶段就是服务的正常顺序调用(数据库事务正常提交),如果都执行成功,则第二阶段则什么都不做;但如果其中有执行发生异常,则依次调用其补偿服务(一般多逆序调用未已执行服务的反交易)来保证整个交易的一致性。应用实施成本一般。
saga参考链接:https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf和https://www.jianshu.com/p/e4b662407c66?from=timeline&isappinstalled=0有兴趣的可以去学习一下,这里不展开描述。
TCC(Try-Confirm-Cancel)又称补偿事务。其核心思想是:"针对每个操作都要注册一个与其对应的确认和补偿(撤销操作)"。它分为三个操作:
TCC事务的处理流程与2PC两阶段提交类似,不过2PC通常都是在跨库的DB层面,而TCC本质上就是一个应用层面的2PC,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据库操作的粒度,使得降低锁冲突、提高吞吐量成为可能。
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。为了满足一致性的要求,confirm和cancel接口还必须实现幂等。
TCC的特点在于业务资源检查与加锁,一阶段进行校验,资源锁定,如果第一阶段都成功,二阶段对锁定资源进行交易逻辑,否则,对锁定资源进行释放。应用实施成本较高。
RocketMQ的最新版本(4.3.0+)中基于分布式事务消息的实现:
1. Producer send half message to MQ server.
2. After send half message succeed, execute local transaction.
3. Send commit or rollback message to MQ Server based on local transaction results.
4. If commit/rollback message missed or producer pended during the execution of local transaction,MQ server will send check message to each producers in the same group to obtain transaction status.
5. Producer reply commit/rollback message based on local transaction status.
6. Committed message will be delivered to consumer but rolled back message will be discarded by MQ server.
1.Producer 向Mq服务器 发送half message消息。
2.Mq服务器收到half message 并持久化成功之后,会向 Producer确认首次ACK,此时消息处于 HalfMessage 状态,并未发送给对应的Consumer。Producer 开始执行本地事务逻辑。
3. 根据事务执行结果,Producer 向Mq服务器提交二次确认(commit 或者 rollback)。Mq Server 收到 Commit 状态则将半消息标记为可投递,Consumer 最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,Consumer 将不会接受该消息。
4. 在断网或者应用重启的情况下,二次ACK未成功的发给Mq Server,Mq Server会主动向 Producer 启动消息回查(Message Status Check),
5. Producer 根据事务执行结果,对消息回查返回对应的结果。
6. Mq Server根据返回结果,决定继续投递消息或者丢弃消息(重复第4步操作)。
Half(Prepare) Message 是指暂时不能被投递的消息。Producer已经把消息发送给Mq 服务器,但是Mq服务器尚未收到生产者的第二次Ack,这个时候消息会被标记为"temporarily undeliverable",目前消息的状态为 HalfMessage。Message Status Check是指网络断开或者Producer应用重启会导致Mq服务器无法从Producer获取第二次ACK,当Mq服务器发现一个消息长时间处于 HalfMessage 状态时(默认为60S,可配置),它会主动请求Producer,查询消息Id对应的最新状态(commit 或者 rollback)。
It can be thought of as a two-phase commit message implementation to ensure eventual consistency in distributed system. Transactional message ensures that the execution of local transaction and the sending of message can be performed atomically.
事务消息可以被看作是2PC消息的实现,用来保证分布式环境下的最终一致性。事务消息能够确保本地事务的执行,并且它发送消息的环节框架会自动操作。
使用限制:
(1) Messages of the transactional have no schedule and batch support.
(2) In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the “transactionCheckMax” parameter in the configuration of the broker, if one message has been checked over “transactionCheckMax” times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the “AbstractTransactionCheckListener” class.
(3) A transactional message will be checked after a certain period of time that determined by parameter “transactionTimeout” in the configuration of the broker. And users also can change this limit by set user property “CHECK_IMMUNITY_TIME_IN_SECONDS” when sending transactional message, this parameter takes precedence over the “transactionMsgTimeout” parameter.
(4) A transactional message maybe checked or consumed more than once.
(5) Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
(6) Producer IDs of transactional messages cannot be shared with producer IDs of other types of messages. Unlike other types of message, transactional messages allow backward queries. MQ Server query clients by their Producer IDs.
1. 事务消息不支持 delay 或者 batch操作。
2. 为了避免一个 Half Message的消息被检查多次 或者 消息积压,默认对每个消息最多进行15次消息回查,可以通过修改broker的 transactionCheckMax 参数来指定次数。如果一个Half Message状态的消息检查次数超过了 transactionCheckMax,默认情况下会直接丢弃掉并且打印错误日志,可以通过覆盖 AbstractTransactionCheckListener 类来修改这个行为。
3. 通过 transactionMsgTimeout参数可以指定消息回查(Message Status Check)间隔。
4. 事务消息可能会被check或者consume多次,要在Consumer端做好幂等控制。
5. 提交到目标topic的消息可能会失败。一般来说它依赖日志记录。它的高可用是依赖于RocketMQ自身的高可用来保证的。如果你想确保事务消息不会丢失并且让事务完整性得到保证,推荐你使用异步双写的机制。
6. 事务消息的生产者的id不能和其它类型消息的生产者id共享。不像其他类型的消息,事务消息允许倒过来的查询。MQ服务端通过生产者的id查询客户端。
应用:
1. 事务状态
Transactional status
There are three states for transactional message:
(1) TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message.
(2) TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume.
(3) TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status.
1. LocalTransactionState.UNKNOW : 中间状态,意味着Mq server需要稍候再次确认。
2. LocalTransactionState.COMMIT_MESSAGE: 事务完成,意味着消息可以投递给对应的 Consumer。
3. LocalTransactionState.ROLLBACK_MESSAGE: 事务失败,Mq Server会丢弃对应的事务消息,不会投递给对应的Consumer。
2. 开发中使用RocketMQ的分布式事务消息Consumer的代码不需要有什么特别的变化与普通消息Consumer代码一致就可以。
1. 从Mq server获取到消息之后,即开始处理本地事务,处理成功后返回 CONSUME_SUCCESS。
2. 处理失败则返回 RECONSUME_LATER,Mq server会在稍后重新投递这个消息,又进入步骤1。
Consumer 需要做好幂等控制,消息可能会被多次投递到Consumer。
3. 生产者
Use TransactionMQProducer class to create producer client, and specify a unique producerGroup, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result,and the reply status is described in the above section.
使用TransactionMQProducer类来创建生产者客户端,并且要指定一个唯一的生产者组,可以设置一个客户端线程池去执行确认请求。当执行了本地事务之后,你需要根据执行结果给MQ回复消息,回复的消息状态就是上面提到的状态。
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
...
Implement the TransactionListener interface The “executeLocalTransaction” method is used to execute local transaction when send half message succeed. It returns one of three transaction status mentioned in the previous section. The “checkLocalTransaction” method is used to check the local transaction status and respond to MQ check requests. It also returns one of three transaction status mentioned in the previous section.
实现的TransactionListener接口有两个方法,executeLocalTransaction方法是在当half message发送成功后用来执行本地事务的。它返回三种事务状态中的一种。checkLocalTransaction用来 检查本地事务状态并且向MQ响应检查的请求。它也返回三种事务状态中的一种。
import ...
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
基于可靠消息最终一致,一阶段服务正常调用,同时同事务记录消息表,二阶段则进行消息的投递,消费。应用实施成本较低。
1. 实现:http://rocketmq.apache.org/rocketmq/the-design-of-transactional-message/
2. 示例:https://rocketmq.apache.org/docs/transaction-example/
上面的事务框架,大多参考了DTP模型,DTP(Distributed Transaction Processing)参考链接:http://pubs.opengroup.org/onlinepubs/009680699/toc.pdf。
不同框架在实现时,各组件角色的功能、部署形态会根据需求进行调整,例如TM有的是以jar包形式与应用部署在一起,有的则剥离出来需要单独部署(例如Seata中将TM的主要功能放到一个逻辑上集中的Server上,叫做TC( Transaction Coordinator ))
阿里分布式事务框架GTS开源了一个免费社区版Fescar,后来跟蚂蚁TCC方案整合后改名为Seata。参考链接:https://github.com/seata/seata/wiki Seata 会有 4 种分布式事务解决方案,分别是 AT 模式、TCC 模式、Saga 模式和 XA 模式。有些情况下,应用需要调用第三方系统的接口,而第三方系统没有接入GTS。此时需要用到GTS的MT模式。GTS的MT模式可以等价于TCC模式,用户可以根据自身业务需求自定义每个事务阶段的具体行为。MT模式提供了更多的灵活性,可能性,以达到特殊场景下的自定义优化及特殊功能的实现。
这里主要介绍下seata的AT模式和MT模式:
前提:
基于支持本地 ACID 事务的关系型数据库。
Java 应用,通过 JDBC 访问数据库。
两阶段提交协议的演变:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。
详细链接在:https://github.com/seata/seata/wiki/AT-Mode
回顾总览中的描述:一个分布式的全局事务,整体是 两阶段提交 的模型。全局事务是由若干分支事务组成的,分支事务要满足 两阶段提交 的模型要求,即需要每个分支事务都具备自己的:
一阶段 prepare 行为
二阶段 commit 或 rollback 行为
AT 模式基于 支持本地 ACID 事务 的 关系型数据库:
相应的,MT 模式,不依赖于底层数据资源的事务支持: