前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 事务消息

RocketMQ 事务消息

作者头像
GreizLiao
发布2021-08-13 11:14:22
9310
发布2021-08-13 11:14:22
举报
文章被收录于专栏:足球是圆的足球是圆的

RocketMQ事务消息定义

RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

RocketMQ事务消息和业务流程

  1. 发送方发送半事务消息
  2. Broker收到半事务消息存储后返回结果
  3. 发送半事务消息方处理本地事务
  4. 发送方把本地事务处理结果以消息形式发送到Broker
  5. Broker在固定的时间内(默认60秒)未收到4的确认消息,Broker为发送方发送回查消息
  6. 业务发送发收到Broker回查消息后,查询本地业务执行结果
  7. 业务方发送回查结果消息

1-4 是同步调用,5-7是异步调用。RocketMQ事务消息使用了2PC+事后补偿机制保证了最终一致性。

RocketMQ事务消息实现原理

  1. 业务消息发送方使用TransactionMQProducer 发送业务消息到指定的Topic(如:Griez_Topic),TransactionMQProducer 会填充Message的properties属性键值对 TRAN_MSG=true,表示该消息为事务消息。
  2. Broker收到消息,在做存储逻辑时根据消息的properties的TRAN_MSG属性判断是否是事务消息,如果true(事务消息),把原Topic名字保存到Message的properties属性REAL_TOPIC中,并把本消息的Topic替换成RMQ_SYS_TRANS_HALF_TOPIC,然后保存并返回结果(到此完成发送半事务消息)。
  3. 业务方收到消息发送结果后,处理完本地事务。
  4. 把3处理结果保存到EndTransactionRequestHeader并给Broker发送END_TRANSACTION指令消息。
  5. Broker收到END_TRANSACTION指令消息,在RMQ_SYS_TRANS_HALF_TOPIC中获取对应的消息(变异消息)。如果业务本地事务是COMMIT情况,把消息还原原始消息(Topic还原成Griez_Topic)并保存到Griez_Topic中(此时该消费者端才能消费该消息),保存成功后把变异消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,表示该事务消息已经处理完成;如果业务本地事务是ROLLBACK情况,保存成功后把变异消息保存到RMQ_SYS_TRANS_OP_HALF_TOPIC,表示该事务消息已经处理完成。
  6. Broker在启动是时启动了一个定时(60秒)回查的服务,根据半事务消息Topic RMQ_SYS_TRANS_HALF_TOPIC 和处理半事务消息Topic RMQ_SYS_TRANS_OP_HALF_TOPIC 差值判断需要回查的半事务消息,发CHECK_TRANSACTION_STATE回查指令到业务方查询本地事务执行结果。
  7. 业务方收到CHECK_TRANSACTION_STATE回查指令之后执行回查接口,然后把结果跟4的操作一样处理。

RocketMQ事务消息源码解析

Producer发送事务消息以及本地事务确认消息

实现TransactionListener

代码语言:javascript
复制
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.ROLLBACK_MESSAGE;
            case 2:
                return LocalTransactionState.COMMIT_MESSAGE;
            default:
                return LocalTransactionState.COMMIT_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

业务中需要实现TransactionListener接口。 executeLocalTransaction 方法是具体的业务逻辑处理(本地事务处理); checkLocalTransaction 方法是Broker回查本地事务状态接口。

TransactionMQProducer使用

代码语言:javascript
复制
public static void main(String[] args) throws MQClientException, InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);
    TransactionListener transactionListener = new TransactionListenerImpl(); 
    TransactionMQProducer producer = new TransactionMQProducer("griez_test"); // @1
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });

    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.setExecutorService(executorService); // @2
    producer.setTransactionListener(transactionListener);
    producer.start();

    for (int i = 0; i < 10; i++) {
        try {
            Message msg = new Message("Griez_Topic",  ("Hello Griez " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null); // @3
            Thread.sleep(1000);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    latch.wait();
    producer.shutdown();
}

  1. 创建事务消息生产者 TransactionMQProducer
  2. 把TransactionListener注册到TransactionMQProducer中
  3. 发送消息

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction

代码语言:javascript
复制
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    // 标识为事务消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); // @1
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        // 发送半事务消息
        sendResult = this.send(msg); // @2
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    // 处理本地事务
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg); // @3
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
        // 处理结果
        this.endTransaction(sendResult, localTransactionState, localException); // @4
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

  1. 添加属性trans_msg为true,表示该消息是半事务消息;封装group属性,给broker绑定客户端回查用。
  2. 发送消息(跟普通消息一样发送模式),并等待broker接收消息的结果。
  3. 如果broker返回SEND_OK,调用TransactionListener的executeLocalTransaction方法执行本地事务,并返回本地事务执行结果。
  4. 发送本地事务执行结果指令给broker

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction

代码语言:javascript
复制
public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
    final MessageId id;
    if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}

以上是事务消息的投递过程。接下来是事务消息接收处理过程。

Broker接收事务处理消息

org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest

代码语言:javascript
复制
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    // ~~~~~~ 省略N多无关代码
    OperationResult result = new OperationResult();
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {    // @1 事务提交
        // 在topic RMQ_SYS_TRANS_HALF_TOPIC 中查询消息(类似消费者消费该消息)
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader); // @2
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            // 参数校验
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); // @3
            if (res.getCode() == ResponseCode.SUCCESS) {
                // 还原原始topic消息
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); // @4
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                // 存储原始topic消息,存储成功后消费者可以消费
                RemotingCommand sendResult = sendFinalMessage(msgInner);  // @5
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
             // 还原原始topic成功后,往RMQ_SYS_TRANS_OP_HALF_TOPIC topic中添加消息,表示该事务已经完成
                   this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());// @6
                }
                return sendResult;
            }
            return res;
        }
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { //  @7 回滚
        // 在topic RMQ_SYS_TRANS_HALF_TOPIC 中查询消息(类似消费者消费该消息)
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);// @8
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                // 往RMQ_SYS_TRANS_OP_HALF_TOPIC topic中添加消息,表示该事务已经完成
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());// @9
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

  1. 收到指令类型为事务提交时。
  2. 在topic RMQ_SYS_TRANS_HALF_TOPIC 查询消息,这里名字有点误导。
  3. 对指令中需要处理的半事务消息做参数校验。
  4. 还原原始topic信息(Griez_Topic)。
  5. 在原始topic中存储消息,这一步完成后业务消费端才可以消息。
  6. 在topic RMQ_SYS_TRANS_OP_HALF_TOPIC 中保存消息,表示该事务消息已经处理。
  7. 收到指令类型为事务回滚时。
  8. 在topic RMQ_SYS_TRANS_HALF_TOPIC 查询消息,这里名字有点误导。
  9. 在topic RMQ_SYS_TRANS_OP_HALF_TOPIC 中保存消息,表示该事务消息已经处理。

本小节是Broker接收本地事务结果指令后的处理,下一节是Broker异步发回查指令到业务方。

Broker回查处理

Broker 启动时启动了一系列定时任务,代码跟踪链路 org.apache.rocketmq.broker.BrokerStartup#start -> org.apache.rocketmq.broker.BrokerController#start -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#start -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run -> org.apache.rocketmq.common.ServiceThread#waitForRunning -> org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd

Broker端关键代码

org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd

代码语言:javascript
复制
@Override
protected void onWaitEnd() {
    // 超时时间,默认60秒
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    // 回查次数 15
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#check

代码语言:javascript
复制
@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
    try {
        String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.info("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
            long startTime = System.currentTimeMillis();
            // 获取 RMQ_SYS_TRANS_OP_HALF_TOPIC
            MessageQueue opQueue = getOpQueue(messageQueue);
            // 获取消费位点
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset);
                continue;
            }

            // 封装已经完成的
            List<Long> doneOpOffset = new ArrayList<>();
            // 已经处理过的
            HashMap<Long, Long> removeMap = new HashMap<>();
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            int getMessageNullCount = 1;
            long newOffset = halfOffset;
            long i = halfOffset;
            while (true) {
                // 这里会不会有问题?
                if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                    break;
                }
                if (removeMap.containsKey(i)) { // 3
                    log.info("Half offset {} has been committed/rolled back", i);
                    removeMap.remove(i);
                } else {
                    // 从 RMQ_SYS_TRANS_HALF_TOPIC 队列中 i 偏移量获取消息
                    GetResult getResult = getHalfMsg(messageQueue, i);
                    MessageExt msgExt = getResult.getMsg();
                    if (msgExt == null) {
                        // 获取为空可以重试一次
                        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                            break;
                        }
                        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                            log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            break;
                        } else {
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            i = getResult.getPullResult().getNextBeginOffset();
                            newOffset = i;
                            continue;
                        }
                    }

                    // 回查次数大于15次则丢弃 || 消息文件超过过期时间(72小时)则跳过
                    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { // 6
                        listener.resolveDiscardMsg(msgExt);
                        newOffset = i + 1;
                        i++;
                        continue;
                    }
                    if (msgExt.getStoreTimestamp() >= startTime) {
                        log.info("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp()));
                        break;
                    }

                    // 已存储的时间
                    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                    long checkImmunityTime = transactionTimeout;
                    // 消息指定的过期时间
                    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                    if (null != checkImmunityTimeStr) { // 8
                        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                        if (valueOfCurrentMinusBorn < checkImmunityTime) {
                            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                        }
                    } else {
                        if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                            log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                            break;
                        }
                    }
                    List<MessageExt> opMsg = pullResult.getMsgFoundList();
                    // 是否需要回查
                    boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                        || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                        || (valueOfCurrentMinusBorn <= -1);
                    if (isNeedCheck) {
                        // 把消息再次写入 RMQ_SYS_TRANS_HALF_TOPIC 中
                        if (!putBackHalfMsgQueue(msgExt, i)) {
                            continue;
                        }
                        // 重点,回查逻辑, 以broker为生产者给client发送 CHECK_TRANSACTION_STATE 消息
                        listener.resolveHalfMsg(msgExt);
                    } else {
                        // 加载更多的消息
                        pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                        log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult);
                        continue;
                    }
                }
                newOffset = i + 1;
                i++;
            }
            if (newOffset != halfOffset) {
                // 更新回查的偏移量
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }
            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
                // 更新处理队列的偏移量
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.error("Check error", e);
    }

}

业务方接收回查指令关键代码:

org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest

代码语言:javascript
复制
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    switch (request.getCode()) {
        // 回查消息
        case RequestCode.CHECK_TRANSACTION_STATE:
            return this.checkTransactionState(ctx, request);
        case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
            return this.notifyConsumerIdsChanged(ctx, request);
        case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
            return this.resetOffset(ctx, request);
        case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
            return this.getConsumeStatus(ctx, request);
        case RequestCode.GET_CONSUMER_RUNNING_INFO:
            return this.getConsumerRunningInfo(ctx, request);
        case RequestCode.CONSUME_MESSAGE_DIRECTLY:
            return this.consumeMessageDirectly(ctx, request);
        default:
            break;
    }
    return null;
}

org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState

代码语言:javascript
复制
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final CheckTransactionStateRequestHeader requestHeader =
        (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
    if (messageExt != null) {
        String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (null != transactionId && !"".equals(transactionId)) {
            messageExt.setTransactionId(transactionId);
        }
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        if (group != null) {
            MQProducerInner producer = this.mqClientFactory.selectProducer(group);
            if (producer != null) {
                final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                producer.checkTransactionState(addr, messageExt, requestHeader);
            } else {
                log.debug("checkTransactionState, pick producer by group[{}] failed", group);
            }
        } else {
            log.warn("checkTransactionState, pick producer group failed");
        }
    } else {
        log.warn("checkTransactionState, decode message failed");
    }

    return null;
}

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState

代码语言:javascript
复制
@Override
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) {
    Runnable request = new Runnable() {
        private final String brokerAddr = addr;
        private final MessageExt message = msg;
        private final CheckTransactionStateRequestHeader checkRequestHeader = header;
        private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

        @Override
        public void run() {
            TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
            TransactionListener transactionListener = getCheckListener();
            if (transactionCheckListener != null || transactionListener != null) {
                LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                Throwable exception = null;
                try {
                    if (transactionCheckListener != null) {
                        localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                    } else if (transactionListener != null) {
                        log.debug("Used new check API in transaction message");
                        localTransactionState = transactionListener.checkLocalTransaction(message);
                    } else {
                        log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                    }
                } catch (Throwable e) {
                    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                    exception = e;
                }

                this.processTransactionState(
                    localTransactionState,
                    group,
                    exception);
            } else {
                log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
            }
        }

        private void processTransactionState(
            final LocalTransactionState localTransactionState,
            final String producerGroup,
            final Throwable exception) {
            final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
            thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
            thisHeader.setProducerGroup(producerGroup);
            thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
            thisHeader.setFromTransactionCheck(true);

            String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (uniqueKey == null) {
                uniqueKey = message.getMsgId();
            }
            thisHeader.setMsgId(uniqueKey);
            thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
            switch (localTransactionState) {
                case COMMIT_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                    break;
                case ROLLBACK_MESSAGE:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                    log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                    break;
                case UNKNOW:
                    thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                    log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                    break;
                default:
                    break;
            }

            String remark = null;
            if (exception != null) {
                remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
            }

            try {
                DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                    3000);
            } catch (Exception e) {
                log.error("endTransactionOneway exception", e);
            }
        }
    };

    this.checkExecutor.submit(request);
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-08-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ事务消息定义
  • RocketMQ事务消息和业务流程
  • RocketMQ事务消息实现原理
  • RocketMQ事务消息源码解析
    • Producer发送事务消息以及本地事务确认消息
      • Broker接收事务处理消息
        • Broker回查处理
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档