前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >解决 springboot 多数据源或动态数据源 的事务问题

解决 springboot 多数据源或动态数据源 的事务问题

作者头像
botkenni
发布2022-09-07 14:42:21
3.6K2
发布2022-09-07 14:42:21
举报
文章被收录于专栏:IT码农IT码农

需求背景:

  动态数据源 或者 多数据源 在项目当中是经常遇到的,但由于spring 开启事务后,为保证整个事务的 connection 不会变化,spring 在通过 DataSourceUtils 获取 connection 的时候会用 DataSource 作为 key 将 connection 保存到 ThreadLocal 中(这段代码是没办法进行重写的,它是静态方法,并在其他地方直接调用),如下所示:

在这里插入图片描述
在这里插入图片描述

spring 事务获取 connection 源代码:

代码语言:javascript
复制
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
   
    Assert.notNull(dataSource, "No DataSource specified");
    // 从事务管理器中获取connection,如果有直接使用
    ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
    if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
   
        conHolder.requested();
        if (!conHolder.hasConnection()) {
   
            logger.debug("Fetching resumed JDBC Connection from DataSource");
            conHolder.setConnection(fetchConnection(dataSource));
        }
        return conHolder.getConnection();
    }
    // 事务期间第一次获取connection,如果开启事务,则将连接进行缓存
    Connection con = fetchConnection(dataSource);
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
   
        logger.debug("Registering transaction synchronization for JDBC Connection");
        // Use same Connection for further JDBC actions within the transaction.
        // Thread-bound object will get removed by synchronization at transaction completion.
        ConnectionHolder holderToUse = conHolder;
        if (holderToUse == null) {
   
            holderToUse = new ConnectionHolder(con);
        }
        else {
   
            holderToUse.setConnection(con);
        }
        holderToUse.requested();
        TransactionSynchronizationManager.registerSynchronization(
            new ConnectionSynchronization(holderToUse, dataSource));
        holderToUse.setSynchronizedWithTransaction(true);
        if (holderToUse != conHolder) {
   
            TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
        }
    }
    return con;
}

  也就是说,不管我们使用 AbstractRoutingDataSource 方式实现 一个 SqlSessionFactory 对应 多个 DataSource 还是 多个 SqlSessionFactory 对应多个 DataSource 的方式实现多个数据源操作,都spring 事务都无法支持,因为我们的 connection 有多个;

事务实现方案

  利用 ThreadLocal 将事务方法 内用到的 connection 缓存起来,当业务执行完毕,再统一 commit 或者 rollback;

在这里插入图片描述
在这里插入图片描述

具体实现

  首先,自定义新的事务注解

代码语言:javascript
复制
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({
   ElementType.METHOD})
public @interface MultiDSTransaction {
   
}

  实现事务上线文,替换 TransactionSynchronizationManager 中的业务逻辑,用来指明线程执行过程那一段 需要使用到事务;

代码语言:javascript
复制
public class TransactionContext {
   
    private static final ThreadLocal<Boolean> TRAN_SWITCH_CONTEXT = new ThreadLocal<>();
    static {
   
    	// 默认事务处于关闭状态
        TRAN_SWITCH_CONTEXT.set(false);
    }
	// 开启事务
    public static void openTran() {
   
        TRAN_SWITCH_CONTEXT.set(true);
    }
    // 关闭事务
    public static void closeTran() {
   
        TRAN_SWITCH_CONTEXT.set(false);
    }
    // 判断是否开启事务
    public static Boolean isOpenTran() {
   
        return TRAN_SWITCH_CONTEXT.get();
    }
}

  重写 connection,这是因为如果我们在没开启事务的情况下使用 mybatis,它会自动提交事务,并 close 连接(如下图所示),这会导致业务执行完成后,connection 其实处于不可用状态,因此我们需要重写 connection 的 commit 和 close 方法,避免当我们开启事务的时候,连接被自动 commit 和 close;

在这里插入图片描述
在这里插入图片描述

  自定义连接,重写 close 和 commit 方法

代码语言:javascript
复制
public class CustomConnection implements Connection {
   
    // 真实的连接
    private Connection connection;

    public CustomConnection(Connection connection) {
   
        this.connection = connection;
    }
    @Override
    public void commit() throws SQLException {
   
        // 如果没开启多数据源事务,则走 commit
        if (!TransactionContext.isOpenTran()) {
   
            connection.commit();
        }
    }
    public void commitMultiDbTran() throws SQLException {
   
        // 如果开启多数据源,则走 这里的 commit
        connection.commit();
    }
    @Override
    public void close() throws SQLException {
   
        // mybatis 执行完业务后,会触发 close() 操作,如果 connection 被提前 close 了,业务就会出错
        if (!TransactionContext.isOpenTran()) {
   
            connection.close();
        }
    }
    public void closeMultiDbTran() throws SQLException {
   
        // 如果开启多数据源事务,则走 这里的 close
        connection.close();
    }

    @Override
    public Statement createStatement() throws SQLException {
   
        return connection.createStatement();
    }
    .................其他方法如上所示,直接使用 connection 调用相同方法
}

  在自定义数据源中添加事务相关业务,既在获取 连接的地方将 Connection 缓存到 ThreadLocal 中 注:此处自定义数据源代码用的是上一篇动态数据源,其核心就是如果开启了事务,则在获取connection 的时候,将 connection 缓存到 ThreadLocal<List> 中,以便业务结束提交事务的时候用

代码语言:javascript
复制
public class MultiDataSource extends AbstractDataSource implements InitializingBean {
   
    /** * 其他的动态数据源,同意起来方便管理 */
    private static final Map<String, DataSource> DATA_SOURCE_MAP = new ConcurrentHashMap<>();
    /** * 多数据源 执行 事务期间用到的连接 */
    public static final ThreadLocal<List<CustomConnection>> MULTI_TRAN_CONNECTION = new ThreadLocal<>();
    static {
   
        MULTI_TRAN_CONNECTION.set(new ArrayList<>());
    }
    private DataSource defaultTargetDataSource;
    @Autowired
    private DataSourceConfig dataSourceConfig;
    
    /** * 如果开启事务,就将连接缓存到 MULTI_TRAN_CONNECTION 中 */
    @Override
    public Connection getConnection() throws SQLException {
   
        CustomConnection customConnection = new CustomConnection(getDataSource().getConnection());
        if (TransactionContext.isOpenTran()) {
   
            customConnection.setAutoCommit(false);
            MULTI_TRAN_CONNECTION.get().add(customConnection);
        }
        return customConnection;
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
   
        CustomConnection customConnection = new CustomConnection(getDataSource().getConnection(username, password));
        if (TransactionContext.isOpenTran()) {
   
            customConnection.setAutoCommit(false);
            MULTI_TRAN_CONNECTION.get().add(customConnection);
        }
        return customConnection;
    }
    /** * 获取 dataSource 的时候,可以使用 LRU 算法,对 DataSource 进行热点排序,便于无效数据源清理 * @return */
    protected DataSource getDataSource() {
   
        DataSource dataSource;
        String key = DbContext.getCurDb();
        if ((dataSource = DATA_SOURCE_MAP.get(key)) == null) {
   
            synchronized (this) {
   
                if (DATA_SOURCE_MAP.get(key) == null) {
   
                    // 创建新的数据源
                    dataSource = dataSourceConfig.buildDataSource(key);
                    DATA_SOURCE_MAP.put(key, dataSource);
                }
            }
        }
        return dataSource;
    }
    @Override
    @SuppressWarnings("unchecked")
    public <T> T unwrap(Class<T> iface) throws SQLException {
   
        if (iface.isInstance(this)) {
   
            return (T) this;
        }
        return getDataSource().unwrap(iface);
    }
    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
   
        return (iface.isInstance(this) || getDataSource().isWrapperFor(iface));
    }
    public void setDefaultTargetDataSource(DataSource defaultTargetDataSource) {
   
        this.defaultTargetDataSource = defaultTargetDataSource;
    }
    @Override
    public void afterPropertiesSet() throws Exception {
   
        DATA_SOURCE_MAP.put("main", defaultTargetDataSource);
    }
}

  最后一步,利用 AOP 进行方法拦截,对使用了 多数据源 事务注解的方法,执行事务业务

代码语言:javascript
复制
@Aspect
@Configuration
public class MultiDSTransactionConfig {
   
    @Pointcut("@annotation(com.bin.conf.MultiDSTransaction)")
    public void transactPoint() {
   }

    @Around("transactPoint()")
    public Object multiTranAop(ProceedingJoinPoint joinPoint) throws Throwable {
   
        // 开启事务
        TransactionContext.openTran();
        try {
   
            // 执行业务
            Object proceed = joinPoint.proceed();
            // 提交事务
            for (CustomConnection connection : MultiDataSource.MULTI_TRAN_CONNECTION.get()) {
   
                connection.commitMultiDbTran();
                connection.closeMultiDbTran();
            }
            return proceed;
        } catch (Throwable t) {
   
            for (CustomConnection connection : MultiDataSource.MULTI_TRAN_CONNECTION.get()) {
   
                // 事务回滚
                connection.rollback();
                connection.closeMultiDbTran();
            }
            throw t;
        } finally {
   
            // 清空 事务 连接,关闭当前事务
            MultiDataSource.MULTI_TRAN_CONNECTION.get().clear();
            TransactionContext.closeTran();
        }
    }
}

测试代码:

  测试代码如下,本地测试是 OK 的,当报错的之后,事务同时回滚,数据没插入成功,当未出现报错,数据则都插入成功;

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-09-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 需求背景:
  • 事务实现方案
  • 具体实现
  • 测试代码:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档