前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >RocketMQ事务消息实现的底层原理是什么

RocketMQ事务消息实现的底层原理是什么

原创
作者头像
潋湄
修改2025-01-20 10:45:49
修改2025-01-20 10:45:49
1670
举报
文章被收录于专栏:消息队列消息队列

在我们熟知的Kafka、RabbitMQ等消费队列中,都没有实现事务消息这个功能,RocketMQ是唯一实现了这一功能的消息队列中间件,今天我们来从底层看一下RocketMQ如何实现的事务消息

事务消息介绍

我们先来介绍一下事务消息,在我们实际进行业务开发时,一般会在发送消息后操作数据库,因此数据库操作与消息执行就是两个关联的行为,因此它们的执行顺序有以下两种:

先执行数据库操作,后投递消息:在这种情况下,如果一开始数据库操作成功,但是投递消息失败,那么会导致操作无法回滚,也许你想到可以使用数据库事务解决,但是数据库事务只针对数据库操作失败的场景,一旦数据库事务提交后,就无法再进行回滚了,这时可能导致数据库数据的错误

先投递消息,再执行数据库操作:与上面的情况一样,如果投递消息成功,但是数据库操作失败,这时已经投递到Broker的消息无法进行回滚,如果消息对业务有影响,也会导致业务数据错误

因此可以看出,无论是上面两种情况的哪一种,都会导致业务数据的错误,因此我们核心要解决:将数据库事务与消息投递当成一个整体进行操作,如果有一个失败,则将所有已经进行的操作回滚,因此事务消息也就应运而生了,事务消息就将数据库事务与投递消息当做一个整体处理,要么执行成功,要么数据库事务失败,将投递到Broker的消息也进行回滚,那么Consumer就不会消费到这条消息

事务消息大致流程

在追究源码之前,我们先来了解一下事务消息的流程,大致分为两个阶段:

第一阶段:和一般投递Message一样,发送消息,但是这里的Message与正常的Producer投递的消息不一样,这里的Message为Half Message,即半事务消息,半事务消息是不会被Consumer消费到的,它只是投递消息过程中的一个中间消息状态

一般来说,如果发送消息成功后,maxOffset(投递消息后最新消息在MessageQueue的偏移量+1)会有一个不为0的值,但是在投递半事务消息后,maxOffset始终为0,因此也能证明它并不是最终Consumer要进行消费的消息

第二阶段:半事务消息投递成功后,会执行本地事务,这里主要分为以下三种情况:

1)本地事务执行成功,会向Broker发送commit消息,commit后的Message才能被Consumer消费

2)本地事务执行失败,会想Broker发送rollback消息,Broker会将投递的半事务消息删除,保证数据一开始的一致性

3)第三种情况比较特殊,如果Producer实例或者网络发生了波动,即此时Producer在将本地事务执行结果通知给Broker的时候发生了抖动,Broker会进行扫描,如果发现某条Message长时间处于半事务消息状态,则Broker会从被动变为主动,主动地向Producer询问这条Message对应的事务状态

这就是事务消息的大致流程,接下来我们来看一下源码:

事务消息的源码实现

初始化事务消息投递环境

负责投递事务消息的Producer由TransactionMQProducer负责,它继承了DefaultMQProducer,因此其中大部分方法与一般的发送Message的Producer一样,一般投递消息都会在启动方法start后执行,而TransactionMQProducer的run方法与正常Producer不同的是,会在投递消息前会调用initTransactionEnv准备事务消息的所需环境:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.producer
    行数:59
*/
public class TransactionMQProducer extends DefaultMQProducer {
    @Override
    public void start() throws MQClientException {
        this.defaultMQProducerImpl.initTransactionEnv();
        super.start();
    }
}

正常投递消息的流程与普通的Producer一致,都会初始化一些必要信息,之后将信息注册到NameServer中,与Broker保持心跳连接更新实时信息,这里我们着重看一下initTransactionEnv方法:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:153
*/
public class DefaultMQProducerImpl implements MQProducerInner {
    public void initTransactionEnv() {
        TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
        if (producer.getExecutorService() != null) {
            this.checkExecutor = producer.getExecutorService();
        } else {
            this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
            this.checkExecutor = new ThreadPoolExecutor(
                producer.getCheckThreadPoolMinSize(),
                producer.getCheckThreadPoolMaxSize(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.checkRequestQueue);
        }
    }
}

这个方法的主要作用是判断用户有没有自己定义一个线程池服务,如果有定义那么会使用用户定义的线程池服务,否则会自己进行定义

投递事务消息

投递事务消息由方法sendMessageInTransaction完成:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.producer
    行数:87
*/
public class TransactionMQProducer extends DefaultMQProducer {
    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }

        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
}

首先会判断transactionListener是否为空,如果为空那么会抛出异常,这是因为transactionListener主要负责执行本地事务执行以及回查事务消息,没有它事务消息无法执行

继续向下进入sendMessageInTransaction方法,我们首先能看到清空延迟等级操作:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:1220
*/
public class DefaultMQProducerImpl implements MQProducerInner {
    public TransactionSendResult sendMessageInTransaction(final Message msg,
            final LocalTransactionExecuter localTransactionExecuter, final Object arg)
            throws MQClientException {
            TransactionListener transactionListener = getCheckListener();
            if (null == localTransactionExecuter && null == transactionListener) {
                throw new MQClientException("tranExecutor is null", null);
            }

            // ignore DelayTimeLevel parameter
            if (msg.getDelayTimeLevel() != 0) {
                MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
            }

            ......
            }
}
}

由于延迟消息需要反复查看投递消息,而事务消息有可能需要进行回滚,因此事务消息并不支持延迟消息

接下来会将本条消息打上事务消息的标记:

代码语言:java
复制
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:1236
*/
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

由于消息还没有投递到Broker中进行确认,因此这里的消息还处于Half Message状态,即半事务消息状态

而接下来就会投递该消息存储到Broker上,这里与一般的Message进行的是同样操作,最后都会通过sendKernelImpl投递:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:537
*/
public class DefaultMQProducerImpl implements MQProducerInner {
    private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    }
}

Broker对半事务消息的处理

在投递到Broker后,我们知道消息并不会被Consumer消费,但是消息确实存储到了Broker上,那么实际处理方法是怎样的呢,我们来一探究竟:

Broker对于消息的处理在SendMessageProcessor类中,而对于半事务消息的处理如下:

代码语言:java
复制
/**
    包名:package org.apache.rocketmq.broker.processor;
    行数:316
*/
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
        SendMessageContext mqtraceContext,
        SendMessageRequestHeader requestHeader) {

            String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (Boolean.parseBoolean(transFlag)) {
                if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                    + "] sending transaction message is forbidden");
                    return CompletableFuture.completedFuture(response);
                }
                putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
            }

    } else {
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    }
}

在这里获取之前设置的PROPERTY_TRANSACTION_PREPARED属性进行判断,如果为true那么又会调用asyncPrepareMessage方法进行处理,而如果是一般消息则会调用asyncPutMessage进行投递,关于这个方法的具体细节,我有在之前的Broker落盘代码文章https://cloud.tencent.com/developer/article/2485686中讲解,感兴趣的小伙伴可以去了解一下

虽然表面上事务消息与普通消息走的是不同的处理流程,但是实际上却还是用相同的处理逻辑:

代码语言:java
复制
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
    return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}

public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
    return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}

我们可以看到,最终还是会调用asyncPutMessage方法进行处理,只不过会在传递参数时,通过parseHalfMessageInner对消息进行特殊处理

那么我们来看一下parseHalfMessageInner具体实现:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.broker.transaction.queue
    行数:216
*/
public class TransactionalMessageBridge {
    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
}

    public static String buildHalfTopic() {
        return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
    }

我们可以看到,这个方法会保存原本消息的真实topicPROPERTY_REAL_TOPIC与真实入队idPROPERTY_REAL_QUEUE_ID,而这个半事务消息会被暂时投递到RMQ_SYS_TRANS_HALF_TOPIC对应的0号MessageQueue中,由于每个队列中的消息必须按需消费,这也就保证了投递到Broker中的事务消息不会发生消费顺序错乱的问题

Producer在半事务消息投递后继续处理

在大致流程中我们介绍,只有在半事务消息投递到Broker后才能进入第二阶段,而在投递得到结果后,Producer便会调用完成本地事务:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:1237
*/
public class DefaultMQProducerImpl implements MQProducerInner {
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
}

这里会根据半事务消息的投递情况执行不同操作,如果投递结果为SEND_OK那么我们就可以通过

LocalTransactionState获取本地事务id,并通过execute执行事务,反之如果投递结果不为SEND_OK那么就会设置事务执行结果为ROLLBACK

代码语言:java
复制
if (null == localTransactionState) {
    localTransactionState = LocalTransactionState.UNKNOW;
}

如果事务对应的状态还没有设置,说明投递发生了抖动,因此设置为UNKNOWN,而在完成对于事物状态的设置后,Producer端会调用end_transaction,完成对于事务消息的最终处理

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:1237
*/
public class DefaultMQProducerImpl implements MQProducerInner {
    try {
        this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }
}

比较有意思的是,最终对于事务提交或者回滚的操作还是由Broker完成,而endTransaction只是负责设置消息类型并通知给BrokerEND_TRANSACTION请求:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.client.impl.producer
    行数:1237
*/
public class DefaultMQProducerImpl implements MQProducerInner {
    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        //发送END_TRANSACTION请求给Broker处理事务消息
        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
}

Broker执行END_TRANSACTION处理

对于投递后的半事务消息会由EndTransactionProcessor处理:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.broker.processor;
    行数:54
*/
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
            if (requestHeader.getFromTransactionCheck()) {
                ......
            } else {
                ......
            }
    }
}

我们可以看到整个方法被getFromTransactionCheck的判断分为两个部分,我们知道,如果Broker长时间没有收到Producer对于半事务消息的处理,会进行事务消息的回查操作,而这个方法一部分用于执行回查消息逻辑,另一部分执行投递半事务消息正常逻辑

因此我们也能发现,其实Producer有两次调用end_transaction通知Broker的操作:

Producer两次调用END_TRANSACTION操作
Producer两次调用END_TRANSACTION操作

我们先来看一下正常半事务消息投递成功后Broker的处理:

半事务消息投递成功后Broker处理

代码语言:java
复制
/**
    包名:org.apache.rocketmq.broker.processor;
    行数:99
*/
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
switch (requestHeader.getCommitOrRollback()) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    ......
                    return null;
                }

                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }

                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    ......
                    break;
                }
                default:
                    return null;
            }
}

这段代码主要是对UNKNOWN的情况直接返回,而对于COMMITFAIL的处理在后面

对于COMMIT情况处理如下:

代码语言:java
复制
/**
包名:org.apache.rocketmq.broker.processor;
行数:127
*/
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    //通过endMessageTransaction构建要发送的真实消息
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    //清除半消息标志
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    //将消息真正发送到最终的Topic中
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    //删除已经处理的半事务消息
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        }

    private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

我们可以看到,这里会取出我们原本存储的实际队列id实际Topic,之后构建消息,最后发送到真正的Topic中,让下游的Consumer能够进行消费

对于FAIL的处理如下:

代码语言:java
复制
/**
包名:org.apache.rocketmq.broker.processor;
行数:145
*/
            else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }

我们看到,如果本地事务执行失败,那么就会将这条半事务消息从投递队列中删除,也就是事务回滚,以避免对业务数据造成影响

Broker对于事务回查的处理

在Broker启动时,会开启服务TransactionalMessageCheckService定期对半事务消息进行回查,判断半事务消息是否被投递处理:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.broker.transaction
    行数:49
*/
    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    }

我们看到,这里会设置参数timeout(表示超过这个时间就对事务消息进行回查)、checkMax(对事务消息进行回查的最大次数),而最终会将参数传递给check方法:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.broker.transaction
    行数:150
*/
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);

而check方法会从保存着半事务消息的RMQ_SYS_TRANS_HALF_TOPICTopic中取出对应的半事务消息,之后对这个消息进行一系列判断,包括上面的是否达到最大回查次数checkMax、以及半事务消息存储时间上限等等,检查逻辑若无误,那么会继续对这条消息进行处理:

代码语言:java
复制
/**
    包名:org.apache.rocketmq.broker.transaction
    行数:227
*/
if (isNeedCheck) {
    if (!putBackHalfMsgQueue(msgExt, i)) {
        continue;
    }
    listener.resolveHalfMsg(msgExt);
}

这里如果需要进行检查,会再次将消息投递会半事务消息队列中,这么做是为了如果Producer端又出现了抖动,能让Broker继续进行半事务消息的回查操作

而接下来,编写RocketMQ的人十分考虑性能,在检查状态这里甚至使用了异步操作,而这里也会向Producer发起检查事务状态CHECK_TRANSACTION_STATE的请求:

代码语言:java
复制
/**
包名:org.apache.rocketmq.broker.transaction
行数:150
*/
        public void sendCheckMessage(MessageExt msgExt) throws Exception {
        ......
        if (channel != null) {
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }

    public void checkProducerTransactionState(
        final String group,
        final Channel channel,
        final CheckTransactionStateRequestHeader requestHeader,
        final MessageExt messageExt) throws Exception {
    ......
        RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
        request.setBody(MessageDecoder.encode(messageExt, false));
        try {
            this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
    ......
    }

接下来让我们看Producer对于事务回查的处理:

代码语言:java
复制
/**
    包名: org.apache.rocketmq.client.impl.producer;
    行数: 344
*/
private void processTransactionState(
  final LocalTransactionState localTransactionState,
  final String producerGroup,
  final Throwable exception) {
  ......
 DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);
  ......
}

public void endTransactionOneway(
        final String addr,
        final EndTransactionRequestHeader requestHeader,
        final String remark,
        final long timeoutMillis
    ) throws RemotingException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);

        request.setRemark(remark);
        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
    }

这个方法其实就是一开始对于半事务消息处理的代码,只不过是由于网络原因执行发生错误了,因此会再次执行一遍,而这里在执行完后,又会给Broker发送END_TRANSACTION告诉Broker进行半事务消息逻辑处理,整个逻辑也就串起来了

两张图表示事务消息执行流程

事务消息正常执行的情况:

事务消息正常执行
事务消息正常执行

事务消息回滚处理的情况:

事务消息回滚处理
事务消息回滚处理

好了,这就是关于RocketMQ事务消息的全部讲解了,其核心还是通过一个中间TopicRMQ_SYS_TRANS_HALF_TOPIC来承载半事务消息,在本地事务执行成功后才真正投递给Broker落盘,而它对于代码END_TRANSACTION部分、处理半事务状态等部分的代码复用也是很好的,我们能学到代码设计书写的很多技巧

最后,本文到这里就结束了,创作不易,希望对你有所帮助!!!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 事务消息介绍
  • 事务消息大致流程
  • 事务消息的源码实现
    • 初始化事务消息投递环境
    • 投递事务消息
    • Broker对半事务消息的处理
    • Producer在半事务消息投递后继续处理
    • Broker执行END_TRANSACTION处理
      • 半事务消息投递成功后Broker处理
      • Broker对于事务回查的处理
  • 两张图表示事务消息执行流程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档