RocketMQ提供了事务消息的功能,采用2PC(两段式协议)+补偿机制(事务回查)的分布式事务功能,通过消息队列 RocketMQ 版事务消息能达到分布式事务的最终一致。
事务消息发送步骤如下:
事务消息回查步骤如下:
在本地应用发送事务消息的核心类是TransactionMQProducer
,该类通过继承DefaultMQProducer来复用大部分发送消息相关的逻辑,这个类的代码量非常少只有100来行,下面是这个类的sendMessageTransaction
方法
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
//判断transactionListener是否存在
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
//发送事务消息
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
复制代码
这里的transactionListener
就是上面所说的消息回查的类,它提供了2个方法:
接着看DefaultMQProducer.sendMessageInTransaction()
方法:
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
//判断检查本地事务的listenner是否存在
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
//。。。省略
SendResult sendResult = null;
//msg设置参数TRAN_MSG,表示为事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//发送消息成功,执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//执行endTransaction方法,如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//...省略
return transactionSendResult;
}
复制代码
该方法主要做了以下事情
Broker端通过SendMessageProcessor.processRequest()
方法接收处理 Producer 发送的消息 最后会调用到SendMessageProcessor.sendMessage()
,判断消息类型,进行消息存储。
//SendMessageProcessor.java
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//...省略
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//存储事务消息
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
//存储普通消息
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
复制代码
接着看存储半消息的代码 prepareMessage(msgInner) :
//TransactionalMessageBridge.java
//存储事务半消息
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//备份消息的原主题名称与原队列ID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//事务消息的topic和queueID是写死固定的
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
复制代码
在这一步,备份消息的原主题名称与原队列ID,然后取消事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC,队列ID固定为0。与其他普通消息区分开,然后完成消息持久化。 到这里,Broker 就初步处理完了 Producer 发送的事务半消息。
接着我们回到 上面 Producer 发送半消息的地方,往下继续看。
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//发送消息成功,执行本地事务
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
//半消息发送失败,回滚
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
事务半消息发送成功后,会调用transactionListener.executeLocalTransaction
方法执行本地事务。只有半消息发送成功后,才会执行本地事务,如果半消息发送失败,则设置回滚。
本地事务执行后,则调用this.endTransaction()
方法,根据本地事务执行状态,去提交事务或者回滚事务。
如果半消息发送失败或本地事务执行失败告诉服务端是删除半消息,半消息发送成功且本地事务执行成功则告诉服务端生效半消息
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
//提交事务
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
//回滚
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
复制代码
两段式协议发送与提交回滚消息,执行完本地事务消息的状态为UNKNOW
时,结束事务不做任何操作。通过事务状态定时回查得到发送端的事务状态是rollback或commit。
通过TransactionalMessageCheckService
线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC
主题中的消息,回查消息的事务状态。
代码入口:
public void run() {
log.info("Start transaction check service thread!");
//执行间隔
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
@Override
protected void onWaitEnd() {
//事务过期时间
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//检查本地事务
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
复制代码
大致流程如下:
这里重点说下判断消息是否要回查的逻辑:
removeMap是个Map集合的键值对key是half队列的消息offset,value是op队列的消息offset,图中看有两对(100005,80002)、(100004,80003)
doneOpOffset是一个List集合,其中存储的是op队列的消息offset,图中只有8004
check()循环查找half队列中的消息时,100004已经在removeMap中了,跳过下面业务继续循环下一个100005进行下一个逻辑,判断其是否具有回查消息的条件isNeedCheck
接下来我们来一起看看,当Producer或者回查定时任务提交/回滚事务的时候,Broker如何处理事务消息提交、回滚命令的。
//EndTransactionProcessor.java
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
//从节点不处理
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
//省略代码,打印日志
OperationResult result = new OperationResult();
//如果请求为提交事务,进入事务消息提交处理流程
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//根据commitLogOffset从commitlog文件中查找消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//字段检查
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//恢复事务消息的真实的主题、队列,并设置事务ID
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
//设置消息的相关属性,取消事务相关的系统标记
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//发送最终消息,存储,被consumer消费
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//删除预处理消息(prepare)
//其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理(提交或回滚)。
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} //回滚处理
else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//根据commitlogOffset查找消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//字段检查
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//删除预处理消息(prepare)
//将消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
复制代码
这里的逻辑很清晰,其核心实现如下:
假如我们有个订单系统,下单后要调用优惠券系统,我们采用RocketMq的方式,在下单支付成功后发送消息给优惠券系统派发优惠券,这里通过事务消息的方式,保证一定可以派发优惠券成功。 我们来思考下几种异常场景,看看RocketMq能不能解决。
可能由于网络或者mq故障,导致 Producer 订单系统 发送半消息(prepare)失败。 这时订单系统可以执行回滚操作,比如“订单关闭”等,走逆向流程退款给用户。
如果订单系统发送的半消息成功了,但是执行本地事务失败了,如更新订单状态为“已完成”。 这种情况下,执行本地事务失败后,会返回rollback给 MQ,MQ会删除之前发送的半消息。 也就不会调用优惠券系统了。
假如订单系统发送半消息成功后,没有收到MQ返回的响应。
这个时候可能是因为网络问题,或者其他异常报错,订单系统误以为发送MQ半消息失败,执行了逆向回滚流程。
但这个时候其实mq已经保存半消息成功了,那这个消息怎么处理?
这个时候MQ的后台消息回查定时任务TransactionalMessageCheckService
会每隔1分钟扫描一次半消息队列,判断是否需要消息回查,然后回查订单系统的本地事务,这时MQ就会发现订单已经变成“已关闭”,此时就要发送rollback请求给mq,删除之前的半消息。
这个其实也是通过定时任务TransactionalMessageCheckService
,它会发现这个消息超过一定时间还没有进行二阶段处理,就会回查本地事务。
思考
资料引用 blog.csdn.net/gotobat/art… blog.csdn.net/prestigedin… www.jianshu.com/p/4a9fceb69…