前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据库事务提交后才发送MQ消息解决方案

数据库事务提交后才发送MQ消息解决方案

作者头像
SmileNicky
发布2023-11-03 08:33:09
4740
发布2023-11-03 08:33:09
举报
文章被收录于专栏:Nicky's blogNicky's blog

项目场景:

在项目开发中常常会遇到在一个有数据库操作的方法中,发送MQ消息,如果这种情况消息队列效率比较快,就会出现数据库事务还没提交,消息队列已经执行业务,导致不一致问题。举个应用场景,我们提交一个订单,将流水号放在MQ里,MQ监听到后就会查询订单去做其它业务,如果这时候数据库事务还没提交,也就是没生成订单流水,MQ监听到消息就去执行业务,查询订单,肯定会出现业务不一致问题

问题描述

最近遇到一个业务场景,类似于下单过程,场景是用户注册消息,注册成功后,会发送MQ消息,MQ监听到消息后,会查询用户的信息,如何再做其它业务,但是遇到一个问题,就是mq消费消息的速度是快于数据库事务提交的,就是我们用户注册的信息还没写入数据库,mq已经提前消费了,所以会导致查询不到用户注册的信息

大致的代码:

代码语言:javascript
复制
@Transactional(rollbackFor = Exception.class)
public void register(){
     User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
        userMapper.insert(user);
    
    // 发送消息给MQ
    sendMQMessage();
}

原因分析

MQ消息消费快于事务提交

在这里插入图片描述
在这里插入图片描述

解决方案

对于这种情况,下面给出两种处理方法,一种是借助于Spring框架提供的TransactionSynchronizationManager来控制,另外一种方法是借助于Spring框架提供的@TransactionalEventListener来控制事务

  • TransactionSynchronizationManager控制事务
代码语言:javascript
复制
@Transactional(rollbackFor = Exception.class)
public void register() {
    
    User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
    userMapper.insert(user);
    
    
    // after transaction commit
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            // 发送消息给MQ
    		sendMQMessage();
        }
    });

}

测试一下,通过日志可以看出事务已经提交了,如何发送mq,mq监听到消息,就会去读取用户信息,是可以获取到的

在这里插入图片描述
在这里插入图片描述
  • @TransactionalEventListener控制事务

如果借助Spring框架提供的事件监听机制来实现,就需要用到@TransactionalEventListener监听器,下面给出例子

创建一个Event,主要来做参数传送

代码语言:javascript
复制
package com.example.eventlistener.event;

import org.springframework.context.ApplicationEvent;


public class SendMsgEvent extends ApplicationEvent {

    private Long userId;

    private String userName;

    public SendMsgEvent(Object source){
        super(source);
    }

    public Long getUserId() {
        return userId;
    }

    public void setUserId(Long userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }
}

创建一个监听器,注意要加上@Component,组件类才能被Spring容器管理

代码语言:javascript
复制
package com.example.eventlistener.listener;


import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import javax.annotation.Resource;

@Component
@Slf4j
public class SendMsgListener {

    @Resource
    private UserMapper userMapper;

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT , classes = SendMsgEvent.class)
    public void sendMsg(SendMsgEvent sendMsgEvent) {
        log.info("sendMsg: {}" , JSONUtil.toJsonStr(sendMsgEvent));

         // 发送消息给MQ
    	sendMQMessage();
    }
}

业务类实现业务:

代码语言:javascript
复制
package com.example.eventlistener.service.impl;

import cn.hutool.http.HttpRequest;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.event.UserRegisterEvent;
import com.example.eventlistener.mapper.UserMapper;
import com.example.eventlistener.model.User;
import com.example.eventlistener.service.IUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import javax.annotation.Resource;

@Service
@Slf4j
public class UserServiceImpl implements ApplicationEventPublisherAware , IUserService {

    private ApplicationEventPublisher applicationEventPublisher;

    @Resource
    private UserMapper userMapper;


    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void sendMsgAfterRegisterByEvent() {
        User user = doRegister();

        // after transaction commit
        SendMsgEvent sendMsgEvent = new SendMsgEvent(this);
        sendMsgEvent.setUserId(user.getId());
        sendMsgEvent.setUserName(user.getName());
        applicationEventPublisher.publishEvent(sendMsgEvent);

    }
    
    private User doRegister() {
        User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
        userMapper.insert(user);
        log.info("save user info");
        return user;
    }


}

经过测试,也可以实现同样的效果,控制数据库的事务提交后,才执行发送MQ消息

在这里插入图片描述
在这里插入图片描述

补充: 如果执行出现java.lang.IllegalStateException: Transaction synchronization is not active,说明没加事务控制,加上@Transactional(rollbackFor = Exception.class)即可

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 项目场景:
  • 问题描述
  • 原因分析
  • 解决方案
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档