专栏首页Java架构沉思录彻底看懂RocketMQ事务实现原理

彻底看懂RocketMQ事务实现原理

面试中经常会问到比如RocketMQ的事务是如何实现的呢?学习框架,我们不仅要熟练使用,更要掌握设计及原理,才算熟悉一个框架。

1 RocketMQ 事务使用案例

public class CreateOrderService {

  @Autowired
  private OrderDao orderDao;
  @Autowired
  private ExecutorService executorService;

  private TransactionMQProducer producer;

  // 初始化transactionListener 和 producer
  @Init
  public void init() throws MQClientException {
    TransactionListener transactionListener = createTransactionListener();
    producer = new TransactionMQProducer("myGroup");
    producer.setExecutorService(executorService);
    producer.setTransactionListener(transactionListener);
    producer.start();
  }

  // 创建订单服务的请求入口
  @PUT
  @RequestMapping(...)
  public boolean createOrder(@RequestBody CreateOrderRequest request) {
    // 根据创建订单请求创建一条消息
    Message msg = createMessage(request);
    // 发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, request);
    // 返回:事务是否成功
    return sendResult.getSendStatus() == SendStatus.SEND_OK;
  }

  private TransactionListener createTransactionListener() {
    return new TransactionListener() {
      @Override
      public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        CreateOrderRequest request = (CreateOrderRequest ) arg;
        try {
          // 执行本地事务创建订单
          orderDao.createOrderInDB(request);
          // 如果没抛异常说明执行成功,提交事务消息
          return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Throwable t) {
          // 失败则直接回滚事务消息
          return LocalTransactionState.ROLLBACK_MESSAGE;
        }
      }
      
      // 反查本地事务
      @Override
      public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 从消息中获得订单ID
        String orderId = msg.getUserProperty("orderId");

        // 去db查询订单号是否存在,若存在则提交事务
        // 若不存在,可能是本地事务失败了,也可能是本地事务还在执行,所以返回UNKNOW
        return orderDao.isOrderIdExistsInDB(orderId)?
                LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
      }
    };
  }
}

如上案例展示了一个订单创建服务,即往db插一条订单记录,并发一条创建订单的消息,要求写db和发消息俩个操作在一个事务内执行。 首先在init()方法中初始化了transactionListener和发生RocketMQ事务消息的变量producer。

  • createOrder() 真正提供创建订单服务的方法,根据请求的参数创建一条消息,然后调用 producer发事务消息,并返回事务执行结果。
  • createTransactionListener() 在init()方法中调用,构造实现RocketMQ的TransactionListener接口的匿名类,该接口需要实现如下两个方法:
    • executeLocalTransaction:执行本地事务,在这里我们直接把订单数据插入到数据库中,并返回本地事务的执行结果。
    • checkLocalTransaction:反查本地事务,上述流程中是在db中查询订单号是否存在,若存在则提交事务,若不存在,可能本地事务失败了,也可能本地事务还在执行,所以返回UNKNOW

这样便使用RocketMQ的事务简单实现了一个创建订单的分布式事务。

2 RocketMQ事务消息实现原理

2.1 Pro端如何发事务消息?

DefaultMQProducerImpl#sendMessageInTransaction

public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }

    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }

    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 给待发送消息添加属性,表明是一个事务消息(即半消息)
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    // 像发送普通消息一样,把这条事务消息发往Broker
    try {
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        // 事务消息发送成功
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    // 开始执行本地事务
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    // 事务过程的最后,给Broker发送提交或回滚事务的RPC请求。
    try {
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

有事务反查机制作兜底,该RPC请求即使失败或丢失,也不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。

2.2 Broker端如何处理事务消息?

SendMessageProcessor#asyncSendMessage

跟进去看看真正处理半消息的业务逻辑,这段处理逻辑在类

TransactionalMessageBridge

  • putHalfMessage
  • parseHalfMessageInner

RocketMQ并非将事务消息保存至消息中 client 指定的 queue,而是记录了原始的 topic 和 queue 后,把这个事务消息保存在

设计思想

  • 特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC
  • 序号为 0 的 queue

这套 topic 和 queue 对消费者不可见,因此里面的消息也永远不会被消费。这就保证在事务提交成功之前,这个事务消息对 Consumer 是消费不到的。

2.3 Broker端如何事务反查?

在Broker的TransactionalMessageCheckService服务中启动了一个定时器,定时从事务消息queue中读出所有待反查的事务消息。

AbstractTransactionalMessageCheckListener#resolveHalfMsg

  • 针对每个需要反查的半消息,Broker会给对应的Producer发一个要求执行事务状态反查的RPC请求
  • AbstractTransactionalMessageCheckListener#sendCheckMessage
  • Broker2Client#checkProducerTransactionState

根据RPC返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。

最后,提交或者回滚事务。首先把半消息标记为已处理

  1. 如果是提交事务,就把半消息从半消息队列中复制到该消息真正的topic和queue中
  2. 如果是回滚事务,什么都不做

EndTransactionProcessor#processRequest

最后结束该事务。

3 总结

  • 整体实现流程

RocketMQ是基于两阶段提交来实现的事务,把这些事务消息暂存在一个特殊的queue中,待事务提交后再移动到业务队列中。最后,RocketMQ的事务适用于解决本地事务和发消息的数据一致性问题。

参考

  • https://juejin.im/post/6844904193526857742

本文分享自微信公众号 - Java架构沉思录(code-thinker)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-09-24

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 从一笔金币充值去思考分布式事务

    考虑支付重构的时候,自然想到原本属于一个本地事务中的处理,现在要跨应用了要怎么处理。拿充值订单举个栗子吧,假设:原本订单模块和账户模块是放在一起的,现在需要做服...

    黄泽杰
  • 一文读懂数据库事务

    根据维基百科的定义,一个数据库事务通常包含了一个序列的对数据库的读/写操作。它的存在包含有以下两个目的:1)为数据库操作序列提供了一个从失败中恢复到正常状态的方...

    黄泽杰
  • 一文读懂数据库事务

    什么是事务 根据维基百科的定义,一个数据库事务通常包含了一个序列的对数据库的读/写操作。它的存在包含有以下两个目的:1)为数据库操作序列提供了一个从失败中恢复...

    黄泽杰
  • 数字IC验证系列之层次化TLM连接

    (题目改为事务级与信号级的分工协作) UVM验证平台一直在努力降低组件之间的耦合度,拆分那些处理多项任务的组件,只处理单个任务,使各个组件更容易调试和复用。

    AsicWonder
  • 猿思考系列6——事务也就那么回事儿

    看完上一个章节,相信你已经充分的掌握代理的套路,猿人工厂君也知道,内容对于新手而言,理解起来还是比较很吃力的,文中提到的其他方式,大家可以去尝试实现,猿人工厂君...

    山旮旯的胖子
  • 使用Python扩展lldb

    Xcode集成了LLDB,进一步简化了程序调试流程。虽然LLDB很强大,但是它的命令很有限。所幸的是,lldb包含了对python的支持,使得lldb的拓展成为...

    py3study
  • 哪里才是中国最热的火炉城市?

    对于全球国土面积居世界前列、拥有至少五种气候类型(18种细分类型)的中国而言,这个问题确实不好回答。

    华章科技
  • 归并排序 O(nLogn)

    归并排序的思想是分治法+回溯,将一个无序的数组先按照原来的一半进行拆分,一直拆分到最后一个元素,然后开始回溯,排序开始的过程是再回溯时开始排序的。

    g小志
  • PyCharm快捷键大全(windows+mac)

    Pycharm有着丰富且强大的快捷键组合,如果能熟练掌握常见快捷键的使用,那么绝对能提高你代码的编写效率和质量。

    吾非同
  • Call to undefined function Workerman\posix_getpid

    workerman 在centos下报PHP Fatal error: Call to undefined function Workerman\posix_g...

    双面人

扫码关注云+社区

领取腾讯云代金券