动态数据源 或者 多数据源 在项目当中是经常遇到的,但由于spring 开启事务后,为保证整个事务的 connection 不会变化,spring 在通过 DataSourceUtils 获取 connection 的时候会用 DataSource 作为 key 将 connection 保存到 ThreadLocal 中(这段代码是没办法进行重写的,它是静态方法,并在其他地方直接调用),如下所示:
spring 事务获取 connection 源代码:
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
// 从事务管理器中获取connection,如果有直接使用
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
// 事务期间第一次获取connection,如果开启事务,则将连接进行缓存
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
logger.debug("Registering transaction synchronization for JDBC Connection");
// Use same Connection for further JDBC actions within the transaction.
// Thread-bound object will get removed by synchronization at transaction completion.
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
return con;
}
也就是说,不管我们使用 AbstractRoutingDataSource 方式实现 一个 SqlSessionFactory 对应 多个 DataSource 还是 多个 SqlSessionFactory 对应多个 DataSource 的方式实现多个数据源操作,都spring 事务都无法支持,因为我们的 connection 有多个;
利用 ThreadLocal 将事务方法 内用到的 connection 缓存起来,当业务执行完毕,再统一 commit 或者 rollback;
首先,自定义新的事务注解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({
ElementType.METHOD})
public @interface MultiDSTransaction {
}
实现事务上线文,替换 TransactionSynchronizationManager 中的业务逻辑,用来指明线程执行过程那一段 需要使用到事务;
public class TransactionContext {
private static final ThreadLocal<Boolean> TRAN_SWITCH_CONTEXT = new ThreadLocal<>();
static {
// 默认事务处于关闭状态
TRAN_SWITCH_CONTEXT.set(false);
}
// 开启事务
public static void openTran() {
TRAN_SWITCH_CONTEXT.set(true);
}
// 关闭事务
public static void closeTran() {
TRAN_SWITCH_CONTEXT.set(false);
}
// 判断是否开启事务
public static Boolean isOpenTran() {
return TRAN_SWITCH_CONTEXT.get();
}
}
重写 connection,这是因为如果我们在没开启事务的情况下使用 mybatis,它会自动提交事务,并 close 连接(如下图所示),这会导致业务执行完成后,connection 其实处于不可用状态,因此我们需要重写 connection 的 commit 和 close 方法,避免当我们开启事务的时候,连接被自动 commit 和 close;
自定义连接,重写 close 和 commit 方法
public class CustomConnection implements Connection {
// 真实的连接
private Connection connection;
public CustomConnection(Connection connection) {
this.connection = connection;
}
@Override
public void commit() throws SQLException {
// 如果没开启多数据源事务,则走 commit
if (!TransactionContext.isOpenTran()) {
connection.commit();
}
}
public void commitMultiDbTran() throws SQLException {
// 如果开启多数据源,则走 这里的 commit
connection.commit();
}
@Override
public void close() throws SQLException {
// mybatis 执行完业务后,会触发 close() 操作,如果 connection 被提前 close 了,业务就会出错
if (!TransactionContext.isOpenTran()) {
connection.close();
}
}
public void closeMultiDbTran() throws SQLException {
// 如果开启多数据源事务,则走 这里的 close
connection.close();
}
@Override
public Statement createStatement() throws SQLException {
return connection.createStatement();
}
.................其他方法如上所示,直接使用 connection 调用相同方法
}
在自定义数据源中添加事务相关业务,既在获取 连接的地方将 Connection 缓存到 ThreadLocal 中 注:此处自定义数据源代码用的是上一篇动态数据源,其核心就是如果开启了事务,则在获取connection 的时候,将 connection 缓存到 ThreadLocal<List> 中,以便业务结束提交事务的时候用
public class MultiDataSource extends AbstractDataSource implements InitializingBean {
/** * 其他的动态数据源,同意起来方便管理 */
private static final Map<String, DataSource> DATA_SOURCE_MAP = new ConcurrentHashMap<>();
/** * 多数据源 执行 事务期间用到的连接 */
public static final ThreadLocal<List<CustomConnection>> MULTI_TRAN_CONNECTION = new ThreadLocal<>();
static {
MULTI_TRAN_CONNECTION.set(new ArrayList<>());
}
private DataSource defaultTargetDataSource;
@Autowired
private DataSourceConfig dataSourceConfig;
/** * 如果开启事务,就将连接缓存到 MULTI_TRAN_CONNECTION 中 */
@Override
public Connection getConnection() throws SQLException {
CustomConnection customConnection = new CustomConnection(getDataSource().getConnection());
if (TransactionContext.isOpenTran()) {
customConnection.setAutoCommit(false);
MULTI_TRAN_CONNECTION.get().add(customConnection);
}
return customConnection;
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
CustomConnection customConnection = new CustomConnection(getDataSource().getConnection(username, password));
if (TransactionContext.isOpenTran()) {
customConnection.setAutoCommit(false);
MULTI_TRAN_CONNECTION.get().add(customConnection);
}
return customConnection;
}
/** * 获取 dataSource 的时候,可以使用 LRU 算法,对 DataSource 进行热点排序,便于无效数据源清理 * @return */
protected DataSource getDataSource() {
DataSource dataSource;
String key = DbContext.getCurDb();
if ((dataSource = DATA_SOURCE_MAP.get(key)) == null) {
synchronized (this) {
if (DATA_SOURCE_MAP.get(key) == null) {
// 创建新的数据源
dataSource = dataSourceConfig.buildDataSource(key);
DATA_SOURCE_MAP.put(key, dataSource);
}
}
}
return dataSource;
}
@Override
@SuppressWarnings("unchecked")
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isInstance(this)) {
return (T) this;
}
return getDataSource().unwrap(iface);
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return (iface.isInstance(this) || getDataSource().isWrapperFor(iface));
}
public void setDefaultTargetDataSource(DataSource defaultTargetDataSource) {
this.defaultTargetDataSource = defaultTargetDataSource;
}
@Override
public void afterPropertiesSet() throws Exception {
DATA_SOURCE_MAP.put("main", defaultTargetDataSource);
}
}
最后一步,利用 AOP 进行方法拦截,对使用了 多数据源 事务注解的方法,执行事务业务
@Aspect
@Configuration
public class MultiDSTransactionConfig {
@Pointcut("@annotation(com.bin.conf.MultiDSTransaction)")
public void transactPoint() {
}
@Around("transactPoint()")
public Object multiTranAop(ProceedingJoinPoint joinPoint) throws Throwable {
// 开启事务
TransactionContext.openTran();
try {
// 执行业务
Object proceed = joinPoint.proceed();
// 提交事务
for (CustomConnection connection : MultiDataSource.MULTI_TRAN_CONNECTION.get()) {
connection.commitMultiDbTran();
connection.closeMultiDbTran();
}
return proceed;
} catch (Throwable t) {
for (CustomConnection connection : MultiDataSource.MULTI_TRAN_CONNECTION.get()) {
// 事务回滚
connection.rollback();
connection.closeMultiDbTran();
}
throw t;
} finally {
// 清空 事务 连接,关闭当前事务
MultiDataSource.MULTI_TRAN_CONNECTION.get().clear();
TransactionContext.closeTran();
}
}
}
测试代码如下,本地测试是 OK 的,当报错的之后,事务同时回滚,数据没插入成功,当未出现报错,数据则都插入成功;