前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ事务消息原理简析

RocketMQ事务消息原理简析

原创
作者头像
白少年
发布2023-01-13 17:15:09
5620
发布2023-01-13 17:15:09
举报
文章被收录于专栏:白少年白少年

零、业务场景

代码语言:txt
复制
在项目中,经常遇到这样一个场景,需要保证数据持久化和消息发送要么同时成功,要么同时失败。比如当用户在交易系统下了一个订单,购物车需要消费订单消息清除加购数据、积分系统需要变更用户积分、短信平台需要给买家发送提醒等,交易系统要将订单落入DB和发送订单消息保证一致,不能本地事务回滚,订单没有生成但是发送了创建订单消息,下游系统产生脏数据,也不能订单已经创建,但是下游系统没有感知继而无法履约,影响用户体验。
代码语言:txt
复制
如果让我们自己实现的话,当然也是有办法的。比如在业务数据库中建立一张消息表用于存储消息,将业务数据和消息数据放在同一个事务中进行存储,就可以利用数据库事务保证同时原子性。后续可以定时扫描消息表,将消息数据再发送出去。
代码语言:txt
复制
当然也可以用现成的解决方案,RocketMQ从4.3.0版本开始,支持事务消息。我们只需要编写对应的本地事务执行方法executeLocalTransaction和本地事务执行结果检查方法checkLocalTransaction,RocketMQ会自动调用本地事务执行。如果本地事务执行成功,下游才能消费到消息,如果本地事务执行失败,下游是无法感知到这条消息的

一、使用方法

代码语言:txt
复制
使用RocketMQ发送事务消息,只有消息发送和普通消息发送有所区别。参见官方示例:
代码语言:java
复制
// TransactionProducer.java
// 需要自定义一个TransactionListener用于执行事务executeLocalTransaction和事务执行结果回查checkLocalTransaction 代码在下面
TransactionListener transactionListener = new TransactionListenerImpl();
// 事务消息发送producer
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
// 创建一个线程池 用于Broker回查本地事务执行状态 如果这里没有创建,RocketMQ会自动创建一个线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {
  Thread thread = new Thread(r);
  thread.setName("client-transaction-msg-check-thread");
  return thread;
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < MESSAGE_COUNT; i++) {
  try {
    Message msg =
      new Message(TOPIC, tags[i % tags.length], "KEY" + i,
                  ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);

    Thread.sleep(10);
  } catch (MQClientException | UnsupportedEncodingException e) {
    e.printStackTrace();
  }
}
代码语言:java
复制
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 本地事务执行状态回查
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
代码语言:txt
复制
本地事务的执行状态,有三种结果:
  • LocalTransactionState.COMMIT_MESSAGE:事务执行成功,Broker会处理消息供下游消费
  • LocalTransactionState.ROLLBACK_MESSAGE:事务被回滚,Broker会删除消息,下游感知不到消息
  • LocalTransactionState.UNKNOW:事务的执行结果未知,比如事务还在执行中,稍后Broker会回重复回查,直到超过最大时间或者最大次数二、原理解析0、整体流程
image.png
image.png
  1. Producer发送事务消息// 编写TransactionListener实现类用于执行本地事务和本地事务回查 TransactionListener transactionListener = new TransactionListenerImpl(); // 发送事务消息专用的TransactionMQProducer TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP); producer.setNamesrvAddr(DEFAULT_NAMESRVADDR); // 新建一个线程池用于异步执行从Broker过来的事务回查 如果这里不新建 Broker也会自动创建一个 ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < MESSAGE_COUNT; i++) { try { Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送事务消息 SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } }// 发送事务消息 public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { // 校验是否设置TransactionListener 发送事务消息必须要有TransactionListener TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { // 事务消息不支持延迟发送 MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 标记prepare消息 Broker根据这个判断是否是一条事务消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // 设置消息生产者组 为了查询事务消息本地事务状态时 从该生产者组中随机选择一个消息生产者即可 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } // ...... // 对发送结果的处理稍后解析 }2、Broker接收事务消息// asyncSendMessage方法 CompletableFuture<PutMessageResult> putMessageResult = null; // 依据消息是否有MessageConst.PROPERTY_TRANSACTION_PREPARED判断是否事务消息 String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (Boolean.parseBoolean(transFlag)) { // 处理事务消息 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } // 保存事务消息 putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { // 保存消息 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { // 备份原本的topic和队列 MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); // 设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); // 队列是写死的 只有一个 也就是说是顺序处理的 msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }3、Producer处理发送消息结果// sendMessageInTransaction方法 try { // 事务消息发送结果 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; // 处理事务消息发送结果 switch (sendResult.getSendStatus()) { // 发送成功的情况 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { // 这个已经废弃了 不会进入这里 localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { // 这里执行本地事务 log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { // 有catch逻辑 是考虑到事务执行异常的场景 log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }4、Producer通知Broker事务执行状态try { // 事务消息收尾工作 通知Broker干活 this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); // 设置本地事务执行状态 switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 发送消息给Broker this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }public void endTransactionOneway( final String addr, final EndTransactionRequestHeader requestHeader, final String remark, final long timeoutMillis ) throws RemotingException, InterruptedException { // 指定Broker使用EndTransactionProcessor处理 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); request.setRemark(remark); // 单向消息 不考虑发送结果 // 也就是说 是可能发送失败的 发送失败之后Broker会回查 this.remotingClient.invokeOneway(addr, request, timeoutMillis); }5、Broker处理事务执行状态通知// processRequest方法 // 上面的代理是Broker向Producer回查事务后的处理 稍后解析 else { // 发送半消息之后产生的调用 switch (requestHeader.getCommitOrRollback()) { // 如果事务执行不是commit或者rollback 直接返回 不再进行下面的逻辑 case MessageSysFlag.TRANSACTION_NOT_TYPE: { LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); return null; } // 如果事务执行是commit 接着下面的处理 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: { break; } // 如果事务执行是rollback 打印异常日志 接着下面的处理 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: { LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message." + "RequestHeader: {} Remark: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.toString(), request.getRemark()); break; } default: return null; } } OperationResult result = new OperationResult(); if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { // 根据偏移量 取出topic是RMQ_SYS_TRANS_HALF_TOPIC的消息 // 第2步Broker保存消息之后 会把偏移量通知Producer Producer再传到这里 result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // 事务执行状态是commit if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { // 从消息中取出原来的topic和队列等信息 构建真实消息 MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); // 清除事务消息相关标记 防止循环处理 MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); // 保存真实消息 RemotingCommand sendResult = sendFinalMessage(msgInner); if (sendResult.getCode() == ResponseCode.SUCCESS) { // 删除消息RMQ_SYS_TRANS_HALF_TOPIC // 实际上是投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 并标记删除 // 这里为什么还要投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中 不直接删除呢 后面还需要根据这个判断是否是重复处理等 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return sendResult; } return res; } } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 如果事务状态是rollback 删除消息RMQ_SYS_TRANS_HALF_TOPIC 投递到RMQ_SYS_TRANS_OP_HALF_TOPIC中并标记删除 // 比commit少了一个投递真实主题的步骤 result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader); if (result.getResponseCode() == ResponseCode.SUCCESS) { RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); if (res.getCode() == ResponseCode.SUCCESS) { this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage()); } return res; } }6、Broker主动捞取消息TransactionalMessageCheckService类实现Runnable接口,在Broker启动的时候,回调用BrokerController的start方法,在start方法中,会调用TransactionalMessageCheckService的start方法启动线程,run方法是一个死循环,默认每6秒执行一次public void run() { log.info("Start transaction check service thread!"); long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); while (!this.isStopped()) { this.waitForRunning(checkInterval); } log.info("End transaction check service thread!"); }循环中实际执行的是这个方法 protected void onWaitEnd() { // 事务过期时间 只有当消息存储时间加上这个过期时间大于系统当前时间 才对消息执行事务回查 否则在下一次周期中执行事务回查操作 long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); // 事务回查最大检测次数 如果超过最大检测次数还是无法获知消息的事务状态 不会再会回查 直接丢弃相当于回滚事务 int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { // 获取事务半消息主题下的全部队列 然后依次处理 String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); // RMQ_SYS_TRANS_HALF_TOPIC消息消费进度 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 收到事务消息提交或者回滚请求后的MQ_SYS_TRANS_OP_HALF_TOPIC中消息消费进度 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); // 根据当前的处理进度 依次从已处理队列MQ_SYS_TRANS_OP_HALF_TOPIC拉取32条消息 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; } // single thread // 获取空消息的次数 int getMessageNullCount = 1; // 当前处理半消息的进度 long newOffset = halfOffset; // 当前处理消息的队列偏移量 long i = halfOffset; while (true) { // 超过时常等待下次调度 if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } // 如果该消息已被处理 继续处理下一条消息 if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { // 获取消息 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { // 超过重试次数 直接跳出 结束该消息队列的事务回查状态 if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } // 没有新的消息而返回 结束该消息队列的事务回查 if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); // 其它原因 重新拉取 i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } // needDiscard 如果该消息回查的次数超过允许回查的最大次数 该消息将被丢弃 事务消息提交失败 每回查一次 在消息属性中+1 默认回查最大次数为5 // needSkip 如果事务消息超过文件的过期时间 默认72小时 则跳过该消息 if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } // 消息已存储时间 当前系统时间减去消息存储时间 long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); // checkImmunityTime 立即检测事务消息的时间 设计的意义是 应用程序在发送事务消息后 事务不会马上提交 该时间就是假设事务消息发送成功后 应用程序事务提交时间 在这段时间内 RocketMQ任务事务未提交 不应该在这个时间段内向应用程序发送回查请求 // transactionTimeout 事务消息的超时时间 这个时间是从OP拉取消息的最后一条消息存储时间与check方法开始的时间 如果时间差超过了transactionTimeout 就算时间小于checkImmunityTime 也发送事务回查指令 long checkImmunityTime = transactionTimeout; // PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS 消息事务消息回查的最晚时间 单位为秒 指的是程序发送事务消息 可以指定该事务消息的有效时间 只有在这个时间内收到回查消息才有效 默认为null String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { // 如果当前时间没过应用程序事务结束时间 跳出本次处理 if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); // 如果OP队列中没有已处理消息并且已经超过应用程序事务结束时间transactionTimeout // 或者 // 操作队列不为空并且最后一条消息的存储时间已经超过transactionTimeout boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { // 这里回查是异步处理的 所以在回查之前 需要把消息重新投递到队列中 以便下次check if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 执行回查逻辑 listener.resolveHalfMsg(msgExt); } else { // 如果无法判断是否发送回查消息 则加载更多的已处理消息进行筛选 pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } if (newOffset != halfOffset) { // 保存半消息的回查进度 transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { // 保存OP进度 transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } } } catch (Throwable e) { log.error("Check error", e); } }7、Broker主动回查事务状态public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); } public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); // 从同一个生产者组中选择一个Producer进行回查 // 所以同一个生产者组中如果部分机器出现宕机、发布重启等问题 也不会影响回查 Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } } public void checkProducerTransactionState( final String group, final Channel channel, final CheckTransactionStateRequestHeader requestHeader, final MessageExt messageExt) throws Exception { // 给Producer发送消息时 指定类型是CHECK_TRANSACTION_STATE RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.setBody(MessageDecoder.encode(messageExt, false)); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}", group, messageExt.getMsgId(), e.toString()); } }9、Producer本地事务回查public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: // 判断是Broker事务回查 检查本地事务执行状态 return this.checkTransactionState(ctx, request); // ......省略部分代码 } return null; }public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { //...省略部分代码 下文解析 }; // 这里正是用新建TransactionMQProducer时创建的线程池异步执行提高效率 this.checkExecutor.submit(request); }public void run() { TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionListener transactionListener = getCheckListener(); if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { if (transactionCheckListener != null) { localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { // 检查本地事务 log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } else { log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } // 将本地事务状态通知Broker // 和第四步Producer第一次尝试通知Broker一样 也是单向发送 可能发送失败 this.processTransactionState( localTransactionState, group, exception); } else { log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); } }
  2. Broker端SendMessageProcessor收到消息后,判断如果是一条事务消息,会将消息原来的topic和队列id存储到消息拓展中,设置新的topic为RMQ_SYS_TRANS_HALF_TOPIC然后 进行存储,然后通知Producer
  3. Producer收到Broker消息发送成功后,开始执行本地事务
  4. 本地事务执行完毕,Producer将事务执行状态通知Broker
  5. Broker端EndTransactionProcessor收到事务执行状态,从RMQ_SYS_TRANS_HALF_TOPIC中取出消息。如果事务执行成功,则从消息拓展中取出原本的topic和队列id,存储到真实的topic和队列id中,存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中;如果是事务回滚,只把消息存储到RMQ_SYS_TRANS_OP_HALF_TOPIC主题中
  6. 如果Broker没有收到Producer事务执行状态的通知,Broker端TransactionalMessageCheckService会主动定时从RMQ_SYS_TRANS_HALF_TOPIC中捞取消息,判断是否有需要回查的消息
  7. 如果有需要回查的消息,Broker端TransactionalMessageCheckService会向Producer回查事务状态
  8. Producer执行TransactionListener的checkLocalTransaction方法,查询事务执行状态
  9. Producer查询本地事务状态之后再执行上述第4步和第5步1、Producer发送消息

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 零、业务场景
  • 一、使用方法
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档