上一篇写了分布式事务产生背景以及解决方案,今天来实践一下,用Spring+Mybatis+Atomikos实现分布式事务。
【注】Atomikos只适合在数据库分库分表,在同一个应用程序中操作多数据源的情况。其他情况可以用其他解决方案,例如TCC,最终一致性等解决方案。
项目情况:Spring+Mybatis+Atomikos+Dubbo+Zookeeper+MySQL
多数据源:在MySQL下创建两个库,分别是order和user
order数据库下有order表,id是主键
user数据库下有user表,id是主键
我们主要测试的情况就是在一张表插入数据失败后,另一张表能否插入成功,如果两张表都插入失败,则说明体现了分布式事务的情况。
整体架构:
pom文件依赖:
<!--spring相关依赖--><dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-expression</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId></dependency><dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId></dependency>
<!--dubbo相关依赖--><dependency> <groupId>com.alibaba</groupId> <artifactId>dubbo</artifactId> <!--去除dubbo中的spring依赖,因为dubbo自带的spirng是低版本的,会出现依赖冲突的情况--> <exclusions> <exclusion> <artifactId>spring</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions></dependency><dependency> <groupId>com.github.sgroschupf</groupId> <artifactId>zkclient</artifactId></dependency><dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId></dependency><!--mybatis相关依赖--><dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.5.1</version></dependency><dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>2.0.4</version></dependency><!--数据库连接相关依赖--><dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId></dependency><dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId></dependency><dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.0</version></dependency><!--分布式事务相关依赖--><dependency> <groupId>com.atomikos</groupId> <artifactId>transactions-jdbc</artifactId></dependency><dependency> <groupId>javax.transaction</groupId> <artifactId>jta</artifactId></dependency>
数据源配置以及XA事务配置:项目中的service-common.xml文件
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd http://www.springframework.org/schema/aop https://www.springframework.org/schema/aop/spring-aop.xsd" default-autowire="byName">
<!--开启spring注解,配置spring注解扫描--> <context:component-scan base-package="com.liusy.order_service"/> <context:annotation-config/>
<!--读取属性配置文件--> <!--<context:property-placeholder location="classpath:application.properties" />-->
<!--配置抽象XA数据源属性--> <bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close" abstract="true"> <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/> <property name="poolSize" value="10" /> <property name="minPoolSize" value="10"/> <property name="maxPoolSize" value="30"/> <property name="borrowConnectionTimeout" value="60"/> <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回--> <property name="reapTimeout" value="20"/> <!--最大获取数据时间,如果不设置这个值,Atomikos使用默认的5分钟,那么在处理大批量数据读取的时候,一旦超过5分钟,就会抛出类似 Resultset is close 的错误.--> <property name="maxIdleTime" value="60"/> <!--最大闲置时间,超过最小连接池连接的连接将将关闭--> <property name="maintenanceInterval" value="60" /> <!--连接回收时间--> <property name="loginTimeout" value="60" /> <!--java数据库连接池,最大可等待获取datasouce的时间--> <property name="logWriter" value="60"/> <property name="testQuery"> <value>select 1</value> </property> </bean>
<!--配置order数据源--> <bean id="orderDataSource" parent="abstractXADataSource"> <property name="uniqueResourceName" value="orderDataSource"/> <property name="xaProperties"> <props> <prop key="user">root</prop> <prop key="password">123456</prop> <prop key="URL">jdbc:mysql://192.168.197.100:3306/order</prop> </props> </property> </bean>
<!--配置user数据源--> <bean id="userDataSource" parent="abstractXADataSource"> <property name="uniqueResourceName" value="userDataSource"/> <property name="xaProperties"> <props> <prop key="user">root</prop> <prop key="password">123456</prop> <prop key="URL">jdbc:mysql://192.168.197.100:3306/user</prop> </props> </property> </bean>
<!--配置order数据库的SqlSessionFactory,以及对应的Mapper.xml文件路径--> <bean id="orderSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="orderDataSource"/> <property name="mapperLocations" value="classpath:dao-xml/OrderMapper.xml"/> </bean>
<!--配置user数据库的SqlSessionFactory,以及对应的Mapper.xml文件路径--> <bean id="userSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="userDataSource"/> <property name="mapperLocations" value="classpath:dao-xml/UserMapper.xml"/> </bean>
<!--配置order的mapper包路径--> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="com.liusy.order_service.orderdao"/> <property name="sqlSessionFactoryBeanName" value="orderSqlSessionFactory"/> </bean>
<!--配置user的mapper包路径--> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="com.liusy.order_service.userdao"/> <property name="sqlSessionFactoryBeanName" value="userSqlSessionFactory"/> </bean>
<!--atomiko transaction manager--> <bean id="atomiokosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown" value="true"/> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="300"/> </bean>
<bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="atomiokosTransactionManager"/> <property name="userTransaction" ref="atomikosUserTransaction"/> </bean>
<bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="springTransactionManager"/> </bean>
<!--配置AOP--> <aop:config> <aop:pointcut id="jtaServiceOperation" expression="execution(* com.liusy.order_service.impl.OrderImpl.saveOrder(..))"></aop:pointcut> <aop:advisor pointcut-ref="jtaServiceOperation" advice-ref="txAdvice"></aop:advisor> </aop:config>
<!--开始spring事务--> <tx:annotation-driven transaction-manager="springTransactionManager"/> <tx:advice id="txAdvice" transaction-manager="springTransactionManager"> <tx:attributes> <tx:method name="save*" rollback-for="Exception" /> </tx:attributes> </tx:advice>
</beans>
需要注意的是,在多数据源的情况下一定要配置每个SqlSessionFactory的MapperScannerConfigurer,否则会报错:SqlSessionFactory不唯一
simple-order.xml暴露服务:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--扫描接口实现--> <context:component-scan base-package="com.liusy.order_service"/> <context:annotation-config></context:annotation-config> <!-- 提供方应用信息,用于计算依赖关系,name是唯一的--> <dubbo:application name="order"/>
<!-- 使用zookeeper注册中心暴露服务地址 id:注册中心唯一id,可以在下面服务注册时使用某个注册中心 register:是否向此注册中心注册服务,如果设为false,将只订阅其他服务,不把自身进行注册 subscribe:是否向此注册中心订阅服务,如果设为false,将只将自身进行注册,不订阅其他服务 --> <dubbo:registry protocol="zookeeper" address="192.168.197.100:2181,192.168.197.11:2181,192.168.197.120:2181"/>
<!-- 用dubbo协议在20880端口暴露服务,绑定ip --> <dubbo:protocol name="dubbo" port="20881"/> <!-- 声明需要暴露的服务接口 写操作可以设置retries=0 避免重复调用SOA服务 interface:接口 ref:实现类 group:当接口多实现时,可区分具体是哪个实现类 loadbalance:负载均衡算法 async:异步调用 register:需注册到哪一个注册中心,如果缺省,则默认向所有注册中心注册 --> <dubbo:service retries="0" id="orderInterface" interface="com.liusy.order_api.inter.OrderInterface" ref="orderImpl"/>
</beans>
主要代码:
(1)OrderImpl.java
@Servicepublic class OrderImpl implements OrderInterface {
private static final Log log = LogFactory.getLog(OrderImpl.class); 1、注入jta事务管理器 @Autowired JtaTransactionManager jtaTransactionManager; @Autowired private OrderMapper orderMapper; @Autowired private UserMapper userMapper; @Override public void saveOrder() { 2、获取事务 UserTransaction userTransaction=jtaTransactionManager.getUserTransaction(); try { 3、开启事务 userTransaction.begin(); orderMapper.saveOrder(); userMapper.saveUser(); 4、事务提交 userTransaction.commit(); }catch(Exception e){ log.error("事务错误,进行回滚,",e); try { 5、事务回滚 userTransaction.rollback(); } catch (SystemException e1) { e1.printStackTrace(); } }
}}
(2)OrderMapper.java和OrderMapper.xml
@Mapperpublic interface OrderMapper { void saveOrder();}
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.liusy.order_service.orderdao.OrderMapper" > <insert id="saveOrder"> insert into `order`(id,name) VALUES (1,'123123') </insert></mapper>
(3)UserMapper.java和UserMapper.xml
@Mapperpublic interface UserMapper { void saveUser();}
<?xml version="1.0" encoding="UTF-8" ?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.liusy.order_service.userdao.UserMapper" > <insert id="saveUser"> insert into `user`(id,name,age) VALUES (1,'李四',12) </insert></mapper>
(4)服务发布主类
public class App { public static void main( String[] args ) throws IOException { //配置文件放在META-INF/spring目录下时 Main.main(args); }}
服务发布:
我需要在另一个dubbo服务中调用此服务:
public class App { public static void main( String[] args ) throws IOException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/simple-client.xml"}); context.start();
OrderInterface orderInterface = context.getBean(OrderInterface.class); orderInterface.saveOrder(); System.out.println("调用orderInterface完成"); }}
测试:
(1)此时user和order数据库里的user和order表都没有数据
客户端:
数据库:此时order和user表中都插入了一条数据
(2)因为我最先开始将数据插入order表,所以我将order表中数据删除,同时调用一次服务向user表中插入同一条数据,看order表中是否能插入成功。
再次调用服务:
此时服务端报错,user表插入数据失败,看order表中是否有数据
此时order表是没有插入数据的。
我把事务关掉,再次调用:
public void saveOrder() { try { orderMapper.saveOrder(); userMapper.saveUser(); }catch(Exception e){ log.error("发生错误",e); }
}
此时依然会报错:
但是order数据库是能保存数据的:
所以在Atomikos在多数据源并且是在同一个业务程序中调用是可以解决分布式事务的情况。
如果是在不同的服务中操作不同的数据源,请使用其他方案。