事务消息

最近更新时间:2019-06-27 10:30:01

当消息生产者本地事务处理成功与消息发送成功不一致时,传统的处理方式无法解决该问题,事务消息实现了消息生产者本地事务与消息发送的原子性,保证了消息生产者本地事务处理成功与消息发送成功的最终一致。用户实现类似 X/Open XA 的分布事务功能,通过 CMQ 事务消息能达到分布式事务的最终一致。

说明:

该功能目前处于灰度测试阶段,暂时支持广州、上海、印度地域,其他地域在逐步支持中,如需试用请通过 提交工单 的方式开通白名单。

模块交互图


其中,事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。

  1. 生产者向 MQ 服务端发送消息(为了方便说明,整个 MQ 服务端用 MQ server 来表示)。
  2. MQ Server 将消息持久化成功之后,向生产者 ACK 确认消息已经发送成功,此时消息为半消息(暂不能投递的消息)。
  3. 生产者发送消息成功后,开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,消费者最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息(直接置消息已消费状态),消费者将不会接受该消息。
  5. 在断网或者是生产者应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    生产者根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

发送事务消息

发送事务消息包含以下两个步骤:

  1. 发送半消息并执行本地事务
  2. 提交本地事务执行状态

其中,提交本地事务执行状态有两种方式:

  • 执行本地事务完成后,SDK 主动提交。
  • 执行本地事务后一直没有提交状态,MQ Server 会主动发送回查,此时 SDK 提交本地事务执行状态。

事务执行状态有以下三种情况:

  • TransactionStatus.COMMIT 提交事务,消费者可以消费到该消息。
  • TransactionStatus.ROLLBACK 回滚事务,消息被丢弃,消费者不会消费到该消息。
  • TransactionStatus.UN_KNOW 无法判断状态,等待 MQ Server 再次发送回查。

发送事务消息示例

package demo;

import com.qcloud.cmq.client.common.ClientConfig;
import com.qcloud.cmq.client.common.LogHelper;
import com.qcloud.cmq.client.common.ResponseCode;
import com.qcloud.cmq.client.common.TransactionStatus;
import com.qcloud.cmq.client.consumer.Message;
import com.qcloud.cmq.client.exception.MQClientException;
import com.qcloud.cmq.client.producer.*;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.List;

public class ProducerTransactionDemo {

    private final static Logger logger = LogHelper.getLog();

    public static void main(String[] args) {
        TransactionProducer producer = new TransactionProducer();
        // 设置 Name Server地址,在控制台上获取, 必须设置
        producer.setNameServerAddress("http://cmq-nameserver-dev.api.tencentyun.com");
        // 设置 SecretId,在控制台上获取,必须设置
        producer.setSecretId("AKIDrcs5AKw5mKiutg9gxGOK5eEdppfLw7D7");
        // 设置 SecretKey,在控制台上获取,必须设置
        producer.setSecretKey("qV2NVSEPRGtMfqpcCaoqhMH14wc6TgiY");
        // 设置签名方式,可以不设置,默认为 SHA1
        producer.setSignMethod(ClientConfig.SIGN_METHOD_SHA256);
        // 设置发送消息失败时,重试的次数,设置为0表示不重试,默认为2
        producer.setRetryTimesWhenSendFailed(3);
        // 设置请求超时时间, 默认3000ms
        producer.setRequestTimeoutMS(5000);
        // 设置首次回查的时间间隔,默认5000ms
        producer.setFirstCheckInterval(5);
        // 消息发往的队列,在控制台创建
        String queue = "test_transaction";
        // 设置回查的回调函数,检查本地事务的校验结果
        producer.setChecker(queue, new TransactionStatusCheckerImpl());


        try{
            // 启动对象前必须设置好相关参数
            producer.start();
            String msg = "test_message";

            TransactionExecutor executor = new TransactionExecutor() {
                @Override
                public TransactionStatus execute(String msg, Object arg) {
                    //执行用户的本地事务 ... ...
                    System.out.println("do local transaction service!");
                    return TransactionStatus.COMMIT;
                }
            };

            // 同步发送单条事务消息
            SendResult result = producer.sendTransactionMessage(queue, msg, executor, "test");

            if (result.getReturnCode() == ResponseCode.SUCCESS) {
                System.out.println("==> send success! msg_id:" + result.getMsgId() + " request_id:" + result.getRequestId());
            } else {
                System.out.println("==> code:" + result.getReturnCode() + " error:" + result.getErrorMsg());
            }

        }catch (MQClientException e){
            e.printStackTrace();
        }

    }
}

class TransactionStatusCheckerImpl implements TransactionStatusChecker{

    @Override
    public TransactionStatus checkStatus(Message msg) {
        //用户实现,检查本地事务的执行状态 ... ...
        return TransactionStatus.COMMIT;
    }
}

消费事务消息

消费事务消息与从普通队列或者订阅中消费一致,详情请参考 队列模型消费消息