前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战(下)

史上最细最强大的RocketMQ实现分布式事务解决方案教程|Java 开发实战(下)

原创
作者头像
huc_逆天
发布2024-01-19 23:41:43
3180
发布2024-01-19 23:41:43
举报
文章被收录于专栏:消息队列

该篇内容作为使用消息队列中间件RocketMQ实现分布式事务的下篇,叙述分布式事务使用RocketMQ中间件实现的代码篇

微服务应用集成MQ

• 引入依赖

代码语言:xml
复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>

• 配置文件配置

代码语言:yml
复制
rocketmq:
  name-server: xxxx:9876
  producer:
    group: base_group_syncMsg
    send-message-timeout: 5000
    retry-times-when-send-failed: 2
    max-message-size: 4194304 

bank1 应用实现

提供请求api

代码语言:java
复制
@GetMapping(value = "/rocketmq")
    public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){
        //创建事务id,作为消息内容发到mq
        String tx_no = UUID.randomUUID().toString();
        //封装事件实体
        AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
        //发送消息
        accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
        return "处理成功-账号:{"+accountNo+"}扣减:{"+amount+"}";
    }

扣款请求

发送消息到MQ

代码语言:java
复制
/**
     * 向mq发送转账消息
     * @param accountChangeEvent 事件实体
     */
    @Override
    public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
        //将accountChangeEvent转成json
        JSONObject jsonObject =new JSONObject();
        jsonObject.put("accountChange",accountChangeEvent);
        String jsonString = jsonObject.toJSONString();
        //生成message类型
        Message<String> message = MessageBuilder.withPayload(jsonString).build();
        //发送一条事务消息
        /**
         * String txProducerGroup 生产组
         * String destination 主题,
         * Message<?> message, 消息内容
         * Object arg 参数
         */
         rocketMQTemplate.sendMessageInTransaction("producer_group_bank1","bank",message,null);
    }

监听MQ返回

代码语言:java
复制
/**
 * @author 小隐乐乐
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.producer.topic}", consumerGroup = "${rocketmq.producer.group}")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}

实现本地业务逻辑

代码语言:java
复制
/**
 * @author 小隐乐乐
 * @description 账户业务实现
 */
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    @Autowired
    AccountInfoDao accountInfoDao;
    //更新账户--增加金额
    @Override
    @Transactional
    public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
        log.info("bank2更新本地账号,账号:{},金额:{}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //本地读取事务  防止重复消费
        if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){
            return ;
        }
        //插入数据--增加金额
        accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
        //添加事务记录,用于幂等
        accountInfoDao.addTx(accountChangeEvent.getTxNo());
        //预留错误演示
        if(accountChangeEvent.getAmount() == 250){
            throw new RuntimeException("消息处理异常");
        }
    }
}

bank1 事务回调监听

代码语言:java
复制
/**
 * @author 小隐乐乐
 * @description 生产者事务回调监听器
 */
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_bank1")
public class ProducerCallbackListener implements RocketMQLocalTransactionListener {
    @Autowired
    AccountInfoService accountInfoService;
    @Autowired
    AccountInfoDao accountInfoDao;
    /**
     * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
     * @param message 消息
     * @return
     */
    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //解析消息
            String messageString = new String((byte[]) message.getPayload());
            JSONObject jsonObject = JSONObject.parseObject(messageString);
            //转成AccountChangeEvent实体
            String accountChangeString = jsonObject.getString("accountChange");
            //将accountChange(json)转成AccountChangeEvent
            AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
            //执行本地事务,扣减金额
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
            //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            //向mq发送ROLLBACK,mq将消息的状态依旧无法消费
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    /**
     * 事务状态回查,查询是否扣减金额
     * @param message 消息
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //解析message,转成AccountChangeEvent
        String messageString = new String((byte[]) message.getPayload());
        JSONObject jsonObject = JSONObject.parseObject(messageString);
        String accountChangeString = jsonObject.getString("accountChange");
        //将accountChange(json)转成AccountChangeEvent
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //事务id
        String txNo = accountChangeEvent.getTxNo();
        int existTx = accountInfoDao.isExistTx(txNo);
        if(existTx>0){
            return RocketMQLocalTransactionState.COMMIT;
        }else{
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}

bank2 应用实现

bank2 监听MQ

代码语言:java
复制
/**
 * @author 小隐乐乐
 * @description 消费者监听
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "bank", consumerGroup = "rocketmq.consumer.group")
public class ConsumerListener implements RocketMQListener<String> {
    /**
     * 注入业务实现
     */
    @Autowired
    AccountInfoService accountInfoService;
    /**
     * 接收消息
     */
    @Override
    public void onMessage(String message) {
        log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);
        //设置账号
        accountChangeEvent.setAccountNo("2");
        //执行业务操作---增加金额
        accountInfoService.addAccountInfoBalance(accountChangeEvent);
    }
}
消息消费
log.info("获取到的消费消息:{}",message);
        //解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String accountChangeString = jsonObject.getString("accountChange");
        //转成AccountChangeEvent对象
        AccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);

执行本地扣款事务

代码语言:java
复制
accountInfoService.addAccountInfoBalance(accountChangeEvent);

总结

终于搞完了,写demo还是很费事的。

分布式事务解决方案很多,到底需不需要分布式事务,也是需要我们技术人员去考量的。那么如果需要,我相信,本篇文章作为RocketMQ实现消息队列分布式事务的快速上手文章,相信你不容错过。如果觉得写的不错,我准备出专栏,哈哈哈。

躺平,在追求梦想的人身上不是一个好选择,技术的脚步,是一直向前的,努力吧,少年们!!!

我正在参与2024腾讯技术创作特训营第五期有奖征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 微服务应用集成MQ
  • bank1 应用实现
  • bank2 应用实现
  • 总结
相关产品与服务
分布式事务 DTF
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档