Java技术分享,点击上方蓝字关注我吧。
RocketMQ系列第三篇。 前两篇介绍了消息队列及RocketMQ的基本使用,本次来聊一下基于RocketMQ的分布式事务解决方案。
现在很多大公司的项目都拆分为为服务器架构的了,通常每个服务只处理一件事情,部署在一个服务器节点上,不同的服务部署在不同的机器上,这就存在服务之间的相互通信问题。
比如订单服务和支付服务,这里举一个简单的业务流程,创建一个订单之后,向MQ发送消息,支付服务消费消息,调起支付,然后订单服务进行修改订单状态,发货。
如果用户已经支付完成了,但是在处理订单状态环节出现了问题,该怎么办?这个时候消费者方(支付服务)已经把消息消费了,无法回滚了。
所以这两个服务,从创建订单到支付到更新订单状态等一系列的操作必须是原子性的。
这就是分布式系统中涉及到的分布式事务问题。
Two-phase Commit,简称2PC,两阶段提交。从字面意思就能想到,提交事务时分两个阶段来完成最终事务的提交。
该方案通过引入一个第三方协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
协调者询问参与者事务是否执行成功,参与者发回事务执行结果。
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
Tip:在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。
Try-Confirm-Cancel,TCC,采用的是补偿机制。其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。
TCC是对2PC的一个改进,try阶段通过预留资源的方式避免了同步阻塞资源的情况;
但是TCC编程需要业务自己实现try,confirm,cancel,对业务入侵太大,实现起来也比较复杂。
RocketMQ支持分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致。
当Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,它暂时不会被Consumer消费。
Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向Producer确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
有一种场景,如果发送预备消息成功,执行本地事务成功,但发送确认消息失败;那么问题就来了,因为Producer的业务都已经处理完毕了,就剩下Consumer消费了,但是你commit失败了,Consumer消费不到,这里就出现了数据不一致。
RocketMQ采用消息状态回查来解决这种问题,RocketMQ会定时遍历commitlog中的预备消息。
因为预备消息最终肯定会变为Commit消息或Rollback消息,所以遍历预备消息去回查本地业务的执行状态,如果发现本地业务没有执行成功就Rollback,如果执行成功就发送Commit消息。
如果超过回查次数,默认回滚消息。
RocketMQ的分布式事务,需要生产者发送事务性消息,使用TransactionMQProducer
类创建生产者,并指定唯一的ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。
执行本地事务后,需要根据执行结果对消息队列进行回复。
生成TransactionMQProducer实例
:
TransactionMQProducer producer = new TransactionMQProducer("laopo");
producer.setNamesrvAddr("192.168.2.110:9876");
//处理检查请求的线程池
ExecutorService executorService = new ThreadPoolExecutor(2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
设置监听事务的接口TransactionListener
:当发送半消息成功时,使用executeLocalTransaction
方法来执行本地事务,返回前文所述的三种状态之一:提交、回滚、未知。
checkLocalTransaction
方法用于检查本地事务状态,并回应消息队列的检查请求,该方法也返回提交、回滚、未知三种状态之一。
//设置回查
producer.setTransactionListener(new TransactionListener() {
private AtomicInteger transactionIndex = new AtomicInteger(0);
//用来保存事务的状态
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
//半消息发送成功触发此方法来执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(message.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
//broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
Integer status = localTrans.get(messageExt.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
});
调用sendMessageInTransaction
来发送消息:
//生产并发送消息
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message msg =
new Message("girl", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
//关闭生产者实例
producer.shutdown();
System.out.printf("%s", "已关闭生产者实例");
运行结果:
之前生产消息生产了TagA、B到TagE的消息,我们这里顺便再验证一下TAG过滤消费,就消费TagB的吧:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("laopo-consumer");
consumer.setNamesrvAddr("192.168.2.110:9876");
//订阅topic,消费TagB的消息
consumer.subscribe("girl", "TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer实例
consumer.start();
System.out.println("consumer started.");
成功消费了TagB的消息。
完整代码位于 GitHub github.com/xblzer/JavaJourney
本次导航结束,以上。
首发公众号 行百里er ,欢迎老铁们关注阅读指正。代码仓库 GitHub github.com/xblzer/JavaJourney