在我们熟知的Kafka、RabbitMQ等消费队列中,都没有实现事务消息这个功能,RocketMQ是唯一实现了这一功能的消息队列中间件,今天我们来从底层看一下RocketMQ如何实现的事务消息
我们先来介绍一下事务消息,在我们实际进行业务开发时,一般会在发送消息后操作数据库,因此数据库操作与消息执行就是两个关联的行为,因此它们的执行顺序有以下两种:
先执行数据库操作,后投递消息:在这种情况下,如果一开始数据库操作成功,但是投递消息失败,那么会导致操作无法回滚,也许你想到可以使用数据库事务解决,但是数据库事务只针对数据库操作失败的场景,一旦数据库事务提交后,就无法再进行回滚了,这时可能导致数据库数据的错误
先投递消息,再执行数据库操作:与上面的情况一样,如果投递消息成功,但是数据库操作失败,这时已经投递到Broker的消息无法进行回滚,如果消息对业务有影响,也会导致业务数据错误
因此可以看出,无论是上面两种情况的哪一种,都会导致业务数据的错误,因此我们核心要解决:将数据库事务与消息投递当成一个整体进行操作,如果有一个失败,则将所有已经进行的操作回滚,因此事务消息也就应运而生了,事务消息就将数据库事务与投递消息当做一个整体处理,要么执行成功,要么数据库事务失败,将投递到Broker的消息也进行回滚,那么Consumer就不会消费到这条消息
在追究源码之前,我们先来了解一下事务消息的流程,大致分为两个阶段:
第一阶段:和一般投递Message一样,发送消息,但是这里的Message与正常的Producer投递的消息不一样,这里的Message为Half Message,即半事务消息,半事务消息是不会被Consumer消费到的,它只是投递消息过程中的一个中间消息状态
一般来说,如果发送消息成功后,maxOffset
(投递消息后最新消息在MessageQueue的偏移量+1)会有一个不为0的值,但是在投递半事务消息后,maxOffset
始终为0,因此也能证明它并不是最终Consumer要进行消费的消息
第二阶段:半事务消息投递成功后,会执行本地事务,这里主要分为以下三种情况:
1)本地事务执行成功,会向Broker发送commit
消息,commit
后的Message才能被Consumer消费
2)本地事务执行失败,会想Broker发送rollback
消息,Broker会将投递的半事务消息删除,保证数据一开始的一致性
3)第三种情况比较特殊,如果Producer实例或者网络发生了波动,即此时Producer在将本地事务执行结果通知给Broker的时候发生了抖动,Broker会进行扫描,如果发现某条Message长时间处于半事务消息状态,则Broker会从被动变为主动,主动地向Producer询问这条Message对应的事务状态
这就是事务消息的大致流程,接下来我们来看一下源码:
负责投递事务消息的Producer由TransactionMQProducer
负责,它继承了DefaultMQProducer
,因此其中大部分方法与一般的发送Message的Producer一样,一般投递消息都会在启动方法start后执行,而TransactionMQProducer
的run方法与正常Producer不同的是,会在投递消息前会调用initTransactionEnv
准备事务消息的所需环境:
/**
包名:org.apache.rocketmq.client.producer
行数:59
*/
public class TransactionMQProducer extends DefaultMQProducer {
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
}
正常投递消息的流程与普通的Producer一致,都会初始化一些必要信息,之后将信息注册到NameServer
中,与Broker保持心跳连接更新实时信息,这里我们着重看一下initTransactionEnv
方法:
/**
包名:org.apache.rocketmq.client.impl.producer
行数:153
*/
public class DefaultMQProducerImpl implements MQProducerInner {
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}
}
这个方法的主要作用是判断用户有没有自己定义一个线程池服务,如果有定义那么会使用用户定义的线程池服务,否则会自己进行定义
投递事务消息由方法sendMessageInTransaction
完成:
/**
包名:org.apache.rocketmq.client.producer
行数:87
*/
public class TransactionMQProducer extends DefaultMQProducer {
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}
首先会判断transactionListener
是否为空,如果为空那么会抛出异常,这是因为transactionListener
主要负责执行本地事务执行以及回查事务消息,没有它事务消息无法执行
继续向下进入sendMessageInTransaction
方法,我们首先能看到清空延迟等级操作:
/**
包名:org.apache.rocketmq.client.impl.producer
行数:1220
*/
public class DefaultMQProducerImpl implements MQProducerInner {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
......
}
}
}
由于延迟消息需要反复查看投递消息,而事务消息有可能需要进行回滚,因此事务消息并不支持延迟消息
接下来会将本条消息打上事务消息的标记:
public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
/**
包名:org.apache.rocketmq.client.impl.producer
行数:1236
*/
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
由于消息还没有投递到Broker中进行确认,因此这里的消息还处于Half Message状态,即半事务消息状态
而接下来就会投递该消息存储到Broker上,这里与一般的Message进行的是同样操作,最后都会通过sendKernelImpl
投递:
/**
包名:org.apache.rocketmq.client.impl.producer
行数:537
*/
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
}
}
在投递到Broker后,我们知道消息并不会被Consumer消费,但是消息确实存储到了Broker上,那么实际处理方法是怎样的呢,我们来一探究竟:
Broker对于消息的处理在SendMessageProcessor
类中,而对于半事务消息的处理如下:
/**
包名:package org.apache.rocketmq.broker.processor;
行数:316
*/
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader) {
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
}
} else {
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
}
在这里获取之前设置的PROPERTY_TRANSACTION_PREPARED
属性进行判断,如果为true
那么又会调用asyncPrepareMessage
方法进行处理,而如果是一般消息则会调用asyncPutMessage
进行投递,关于这个方法的具体细节,我有在之前的Broker落盘代码文章https://cloud.tencent.com/developer/article/2485686中讲解,感兴趣的小伙伴可以去了解一下
虽然表面上事务消息与普通消息走的是不同的处理流程,但是实际上却还是用相同的处理逻辑:
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
return store.asyncPutMessage(parseHalfMessageInner(messageInner));
}
我们可以看到,最终还是会调用asyncPutMessage
方法进行处理,只不过会在传递参数时,通过parseHalfMessageInner
对消息进行特殊处理
那么我们来看一下parseHalfMessageInner
具体实现:
/**
包名:org.apache.rocketmq.broker.transaction.queue
行数:216
*/
public class TransactionalMessageBridge {
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
}
public static String buildHalfTopic() {
return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
}
我们可以看到,这个方法会保存原本消息的真实topicPROPERTY_REAL_TOPIC
与真实入队idPROPERTY_REAL_QUEUE_ID
,而这个半事务消息会被暂时投递到RMQ_SYS_TRANS_HALF_TOPIC
对应的0号MessageQueue
中,由于每个队列中的消息必须按需消费,这也就保证了投递到Broker中的事务消息不会发生消费顺序错乱的问题
在大致流程中我们介绍,只有在半事务消息投递到Broker后才能进入第二阶段,而在投递得到结果后,Producer便会调用完成本地事务:
/**
包名:org.apache.rocketmq.client.impl.producer
行数:1237
*/
public class DefaultMQProducerImpl implements MQProducerInner {
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
}
这里会根据半事务消息的投递情况执行不同操作,如果投递结果为SEND_OK
那么我们就可以通过
LocalTransactionState
获取本地事务id,并通过execute
执行事务,反之如果投递结果不为SEND_OK
那么就会设置事务执行结果为ROLLBACK
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
如果事务对应的状态还没有设置,说明投递发生了抖动,因此设置为UNKNOWN
,而在完成对于事物状态的设置后,Producer端会调用end_transaction
,完成对于事务消息的最终处理
/**
包名:org.apache.rocketmq.client.impl.producer
行数:1237
*/
public class DefaultMQProducerImpl implements MQProducerInner {
try {
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
}
比较有意思的是,最终对于事务提交或者回滚的操作还是由Broker完成,而endTransaction
只是负责设置消息类型并通知给BrokerEND_TRANSACTION
请求:
/**
包名:org.apache.rocketmq.client.impl.producer
行数:1237
*/
public class DefaultMQProducerImpl implements MQProducerInner {
public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
//发送END_TRANSACTION请求给Broker处理事务消息
doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
}
对于投递后的半事务消息会由EndTransactionProcessor
处理:
/**
包名:org.apache.rocketmq.broker.processor;
行数:54
*/
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
if (requestHeader.getFromTransactionCheck()) {
......
} else {
......
}
}
}
我们可以看到整个方法被getFromTransactionCheck
的判断分为两个部分,我们知道,如果Broker长时间没有收到Producer对于半事务消息的处理,会进行事务消息的回查操作,而这个方法一部分用于执行回查消息逻辑,另一部分执行投递半事务消息正常逻辑
因此我们也能发现,其实Producer有两次调用end_transaction
通知Broker的操作:
我们先来看一下正常半事务消息投递成功后Broker的处理:
/**
包名:org.apache.rocketmq.broker.processor;
行数:99
*/
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
......
return null;
}
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
......
break;
}
default:
return null;
}
}
这段代码主要是对UNKNOWN
的情况直接返回,而对于COMMIT
与FAIL
的处理在后面
对于COMMIT
情况处理如下:
/**
包名:org.apache.rocketmq.broker.processor;
行数:127
*/
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//通过endMessageTransaction构建要发送的真实消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
//清除半消息标志
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//将消息真正发送到最终的Topic中
RemotingCommand sendResult = sendFinalMessage(msgInner);
//删除已经处理的半事务消息
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
我们可以看到,这里会取出我们原本存储的实际队列id与实际Topic,之后构建消息,最后发送到真正的Topic中,让下游的Consumer能够进行消费
对于FAIL
的处理如下:
/**
包名:org.apache.rocketmq.broker.processor;
行数:145
*/
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
我们看到,如果本地事务执行失败,那么就会将这条半事务消息从投递队列中删除,也就是事务回滚,以避免对业务数据造成影响
在Broker启动时,会开启服务TransactionalMessageCheckService
定期对半事务消息进行回查,判断半事务消息是否被投递处理:
/**
包名:org.apache.rocketmq.broker.transaction
行数:49
*/
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}
我们看到,这里会设置参数timeout(表示超过这个时间就对事务消息进行回查)、checkMax(对事务消息进行回查的最大次数),而最终会将参数传递给check
方法:
/**
包名:org.apache.rocketmq.broker.transaction
行数:150
*/
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
而check方法会从保存着半事务消息的RMQ_SYS_TRANS_HALF_TOPIC
Topic中取出对应的半事务消息,之后对这个消息进行一系列判断,包括上面的是否达到最大回查次数checkMax、以及半事务消息存储时间上限等等,检查逻辑若无误,那么会继续对这条消息进行处理:
/**
包名:org.apache.rocketmq.broker.transaction
行数:227
*/
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
}
这里如果需要进行检查,会再次将消息投递会半事务消息队列中,这么做是为了如果Producer端又出现了抖动,能让Broker继续进行半事务消息的回查操作
而接下来,编写RocketMQ的人十分考虑性能,在检查状态这里甚至使用了异步操作,而这里也会向Producer发起检查事务状态CHECK_TRANSACTION_STATE
的请求:
/**
包名:org.apache.rocketmq.broker.transaction
行数:150
*/
public void sendCheckMessage(MessageExt msgExt) throws Exception {
......
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
public void checkProducerTransactionState(
final String group,
final Channel channel,
final CheckTransactionStateRequestHeader requestHeader,
final MessageExt messageExt) throws Exception {
......
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
......
}
接下来让我们看Producer对于事务回查的处理:
/**
包名: org.apache.rocketmq.client.impl.producer;
行数: 344
*/
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
......
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);
......
}
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
这个方法其实就是一开始对于半事务消息处理的代码,只不过是由于网络原因执行发生错误了,因此会再次执行一遍,而这里在执行完后,又会给Broker发送END_TRANSACTION
告诉Broker进行半事务消息逻辑处理,整个逻辑也就串起来了
事务消息正常执行的情况:
事务消息回滚处理的情况:
好了,这就是关于RocketMQ事务消息的全部讲解了,其核心还是通过一个中间TopicRMQ_SYS_TRANS_HALF_TOPIC
来承载半事务消息,在本地事务执行成功后才真正投递给Broker落盘,而它对于代码END_TRANSACTION
部分、处理半事务状态等部分的代码复用也是很好的,我们能学到代码设计书写的很多技巧
最后,本文到这里就结束了,创作不易,希望对你有所帮助!!!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。