首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

SpringBoot+MyBatis 多数据源事物问题

这是小小本周的第二篇,本篇将会着重讲解关于SpringBoot + MyBatis 多数据源的事物的问题。

多数据源

此处模拟创建订单和扣减库存。先创建订单表和库存表

CREATE TABLE `t_storage` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`commodity_code` varchar(255) DEFAULT NULL,

`count` int(11) DEFAULT '0',

PRIMARY KEY (`id`),

UNIQUE KEY `commodity_code` (`commodity_code`)

) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `t_order` (

`id` bigint(16) NOT NULL,

`commodity_code` varchar(255) DEFAULT NULL,

`count` int(11) DEFAULT '0',

`amount` double(14,2) DEFAULT '0.00',

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

数据库连接

这里使用yal文件把数据库进行配置

spring:

datasource:

ds1:

jdbc_url: jdbc:mysql://127.0.0.1:3306/db1

username: root

password: root

ds2:

jdbc_url: jdbc:mysql://127.0.0.1:3306/db2

username: root

password: root

基本的目录结构

切换数据源的抽象类

这里切换数据源的抽象类为 AbstractRoutingDataSource这里看一下相关的源码

public abstract class AbstractRoutingDataSource{

//数据源的集合

@Nullable

private Map targetDataSources;

//默认的数据源

@Nullable

private Object defaultTargetDataSource;

//返回当前的路由键,根据该值返回不同的数据源

@Nullable

protected abstract Object determineCurrentLookupKey();

//确定一个数据源

protected DataSource determineTargetDataSource() {

//抽象方法 返回一个路由键

Object lookupKey = determineCurrentLookupKey();

DataSource dataSource = this.targetDataSources.get(lookupKey);

return dataSource;

}

}

对于该源码来说,核心为用Map保存多个数据源信息,根据key获取不同的数据源。

修改数据源的核心

那么这里修改数据源的核心就在于重写determineCurrentLookupKey方法,让其返回一个数据源名称

public class DynamicDataSource extends AbstractRoutingDataSource {

@Override

protected Object determineCurrentLookupKey() {

DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();

return dataBaseType;

}

}

此时的目录结构为

再添加一个工具类

这里再添加一个工具类,用来保存当前线程的数据源类型

public class DataSourceType {

public enum DataBaseType {

ds1, ds2

}

// 使用ThreadLocal保证线程安全

private static final ThreadLocal TYPE = new ThreadLocal();

// 往当前线程里设置数据源类型

public static void setDataBaseType(DataBaseType dataBaseType) {

if (dataBaseType == null) {

throw new NullPointerException();

}

TYPE.set(dataBaseType);

}

// 获取数据源类型

public static DataBaseType getDataBaseType() {

DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();

return dataBaseType;

}

}

此时目录结构为

实现数据源的注入

@Configuration

public class DataSourceConfig {

/**

* 创建多个数据源 ds1 和 ds2

* 此处的Primary,是设置一个Bean的优先级

* @return

*/

@Primary

@Bean(name = "ds1")

@ConfigurationProperties(prefix = "spring.datasource.ds1")

public DataSource getDateSource1() {

return DataSourceBuilder.create().build();

}

@Bean(name = "ds2")

@ConfigurationProperties(prefix = "spring.datasource.ds2")

public DataSource getDateSource2() {

return DataSourceBuilder.create().build();

}

/**

* 将多个数据源注入到DynamicDataSource

* @param dataSource1

* @param dataSource2

* @return

*/

@Bean(name = "dynamicDataSource")

public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,

@Qualifier("ds2") DataSource dataSource2) {

Map targetDataSource = new HashMap();

targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);

targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);

DynamicDataSource dataSource = new DynamicDataSource();

dataSource.setTargetDataSources(targetDataSource);

dataSource.setDefaultTargetDataSource(dataSource1);

return dataSource;

}

/**

* 将动态数据源注入到SqlSessionFactory

* @param dynamicDataSource

* @return

* @throws Exception

*/

@Bean(name = "SqlSessionFactory")

public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)

throws Exception {

SqlSessionFactoryBean bean = new SqlSessionFactoryBean();

bean.setDataSource(dynamicDataSource);

bean.setMapperLocations(

new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));

bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");

return bean.getObject();

}

}

创建多个数据源DataSource,ds1 和 ds2;

将ds1 和 ds2 数据源放入动态数据源DynamicDataSource;

将DynamicDataSource注入到SqlSessionFactory。

设置路由键

创建相关的mapper接口

public interface OrderMapper {

void createOrder(Order order);

}

public interface StorageMapper {

void decreaseStorage(Order order);

}

设置切面

这里设置相关的切面

@Component

@Aspect

public class DataSourceAop {

@Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")

public void setDataSource1() {

DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);

}

@Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")

public void setDataSource2() {

DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);

}

}

在执行订单的操作时,切到数据源ds1,执行库存操作时,切到数据源ds2。

测试

public class OrderServiceImpl implements OrderService {

@Override

public void createOrder(Order order) {

storageMapper.decreaseStorage(order);

logger.info("库存已扣减,商品代码:{},购买数量:{}。创建订单中...",order.getCommodityCode(),order.getCount());

orderMapper.createOrder(order);

}

}

总目录结构

经过测试两个数据表都已经发生了变化

事物模式

这里事物模式,不能切换数据源,可以在,service上添加Transactional上实现。

为什么

这里为什么不能切换数据源首先查看相关的注解

public class TransactionInterceptor{

public Object invoke(MethodInvocation invocation) throws Throwable {

//获取目标类

Class targetClass = AopUtils.getTargetClass(invocation.getThis());

//事务调用

return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);

}

}

创建事物

这里首先执行的是创建事物

protected Object doGetTransaction() {

//DataSource的事务对象

DataSourceTransactionObject txObject = new DataSourceTransactionObject();

//设置事务自动保存

txObject.setSavepointAllowed(isNestedTransactionAllowed());

//给事务对象设置ConnectionHolder

ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource());

txObject.setConnectionHolder(conHolder, false);

return txObject;

}

重点是给事务对象设置了ConnectionHolder属性,不过此时还是为空。

开启事物

这里主要是通过ThreadLocal将资源和当前的事务对象绑定,然后设置一些事务状态。

protected void doBegin(Object txObject, TransactionDefinition definition) {

Connection con = null;

//从数据源中获取一个连接

Connection newCon = obtainDataSource().getConnection();

//重新设置事务对象中的connectionHolder,此时已经引用了一个连接

txObject.setConnectionHolder(new ConnectionHolder(newCon), true);

//将这个connectionHolder标记为与事务同步

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);

con = txObject.getConnectionHolder().getConnection();

con.setAutoCommit(false);

//激活事务活动状态

txObject.getConnectionHolder().setTransactionActive(true);

//将connection holder绑定到当前线程,通过threadlocal

if (txObject.isNewConnectionHolder()) {

TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());

}

//事务管理器,激活事务同步状态

TransactionSynchronizationManager.initSynchronization();

}

执行Mapper接口

这里执行mapper接口

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,

PersistenceExceptionTranslator exceptionTranslator) {

//从ThreadLocal中获取SqlSessionHolder,第一次获取不到为空

SqlSessionHolder holder = TransactionSynchronizationManager.getResource(sessionFactory);

//如果SqlSessionHolder为空,那也肯定获取不到SqlSession;

//如果SqlSessionHolder不为空,直接通过它来拿到SqlSession

SqlSession session = sessionHolder(executorType, holder);

if (session != null) {

return session;

}

//创建一个新的SqlSession

session = sessionFactory.openSession(executorType);

//如果当前线程的事务处于激活状态,就将SqlSessionHolder绑定到ThreadLocal

registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

return session;

}

执行SQL之前,会先获取到SqlSession对象。

拿到SqlSession之后,就开始调用Mybatis的执行器,准备执行SQL语句。在执行SQL之前呢,当然需要先拿到Connection连接。

public Connection getConnection() throws SQLException {

//通过数据源获取连接

//比如我们配置了多数据源,此时还会正常切换

if (this.connection == null) {

openConnection();

}

return this.connection;

}

至此可以看openConnection方法,作用是获取一个连接,如果我们配置了多数据源,此时可以切换,如果添加事物,此时不能切换,因为多了if判断,此时返回的还是上一次保存的链接。

两道小题1:  Spring 如何保证事物:把业务放在同一个数据库连接中,一起提交,一起回滚。2: 这枚做到在一个连接中,在线程中,通过数据库资源和当前事物相绑定实现。

事物模式,切换数据源

创建SqlSessionFactory

在文件中,添加,如下的两个方法

@Bean(name = "sqlSessionFactory1")

public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){

return createSqlSessionFactory(dataSource);

}

@Bean(name = "sqlSessionFactory2")

public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){

return createSqlSessionFactory(dataSource);

}

这里再添加 CustomSqlSessionTemplate 用来代替原来的SqlSessionTemplate,把SqlSessionFactory注入在。同一个文件中添加该方法

@Bean(name = "sqlSessionTemplate")

public CustomSqlSessionTemplate sqlSessionTemplate(){

Map sqlSessionFactoryMap = new HashMap();

sqlSessionFactoryMap.put("ds1",factory1);

sqlSessionFactoryMap.put("ds2",factory2);

CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);

customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);

customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);

return customSqlSessionTemplate;

}

补充上自定义的CustomSqlSessionTemplate

public class CustomSqlSessionTemplate extends SqlSessionTemplate {

@Override

public SqlSessionFactory getSqlSessionFactory() {

//当前数据源的名称

String currentDsName = DataSourceType.getDataBaseType().name();

SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);

if (targetSqlSessionFactory != null) {

return targetSqlSessionFactory;

} else if (defaultTargetSqlSessionFactory != null) {

return defaultTargetSqlSessionFactory;

}

return this.sqlSessionFactory;

}

}

核心在于根据是否为空,获取到sqlsessionfactory

if (targetSqlSessionFactory != null) {

return targetSqlSessionFactory;

} else if (defaultTargetSqlSessionFactory != null) {

return defaultTargetSqlSessionFactory;

}

测试

修改完配置之后,我们把Service方法加上事务的注解,此时数据也是可以正常更新的。

@Transactional

@Override

public void createOrder(Order order) {

storageMapper.decreaseStorage(order);

orderMapper.createOrder(order);

}

XA协议分布式事务

这里借助 Atomikos 框架

引入maven

org.springframework.boot

spring-boot-starter-jta-atomikos

2.2.7.RELEASE

更改getDataSource

把DataSource对象改成AtomikosDataSourceBean。

public DataSource getDataSource(Environment env, String prefix, String dataSourceName){

Properties prop = build(env,prefix);

AtomikosDataSourceBean ds = new AtomikosDataSourceBean();

ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());

ds.setUniqueResourceName(dataSourceName);

ds.setXaProperties(prop);

return ds;

}

测试

这样配完之后,获取Connection连接的时候,拿到的其实是MysqlXAConnection对象。在提交或者回滚的时候,走的就是MySQL的XA协议了。

public void commit(Xid xid, boolean onePhase) throws XAException {

//封装 XA COMMIT 请求

StringBuilder commandBuf = new StringBuilder(300);

commandBuf.append("XA COMMIT ");

appendXid(commandBuf, xid);

try {

//交给MySQL执行XA事务操作

dispatchCommand(commandBuf.toString());

} finally {

this.underlyingConnection.setInGlobalTx(false);

}

}

推荐阅读

●实践 + 理论 | API 接口安全性设计

●文末送书 | 数据分析简单入门

●优雅 | 今天很水的文章-Excel导入导出

●理论 | 当 Spring Boot 遇上了消息队列......

●实践 | kafka 基本使用

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200915A00TI800?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券