前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入了解 Spring 中的事务(从核心注解和类入手)

深入了解 Spring 中的事务(从核心注解和类入手)

作者头像
IT技术小咖
发布2020-11-16 14:18:37
1.1K0
发布2020-11-16 14:18:37
举报
文章被收录于专栏:码上修行码上修行

「七剑下天山」

一:@EnableTransactionManagement

二:@Transactional

三:@TransactionEventListener

四:TransactionTemplate

五:DataSourceUtils

六:TransactionSynchronizationManager

七:TransactionAwareDataSourceProxy

01

@EnableTransactionManagement

1.1 作用

此注解是 Spring 支持注解事务配置的标志。表明 Spring 开启注解事务配置的支持。是注解驱动开发事务配置的必备注解。

1.2 源码
代码语言:javascript
复制
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
    /**
     * 指定基于目标类代理还是基于接口代理。
     * 默认采用JDK官方的基于接口代理。
     * @return
     */
    boolean proxyTargetClass() default false;

    /**
     * 指定事务通知是如何执行的。默认是通过代理方式执行的。
     * 如果是同一个类中调用的话,请采用AdviceMode.ASPECTJ
     * @return
     */
    AdviceMode mode() default AdviceMode.PROXY;

    /**
     * 指示在特定连接点应用多个通知时事务处理的执行顺序。
     * 默认值是:最低优先级(Integer.MAX_VALUE)
     * @return
     */
    int order() default Ordered.LOWEST_PRECEDENCE;
}

1.3 源码分析

  • @EnableTransactionManagement 通过在 @Import 注解中传入 TransactionManagementConfigurationSelector 类,会给容器中导入两个组件:
    • AutoProxyRegistrar
    • ProxyTransactionManagementConfiguration
  • AutoProxyRegistrar:给容器中注册一个 InfrastructureAdvisorAutoProxyCreator 组件;利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),代理对象执行方法利用拦截器链进行调用。
  • ProxyTransactionManagementConfiguration 使用 @Configuration 注解修饰,表明是一个配置类。
    • TransactionInterceptor;保存了事务属性信息,事务管理器;
    • TransactionInterceptor 本身是一个 MethodInterceptor,MethodInterceptor 在 spring-aop 的课程中已经分析过了。在目标方法执行的时候,会 getAdvisors() 获取拦截器链,并执行拦截器链,当只有事务拦截器时: 1)先获取事务相关的属性 2)再获取 PlatformTransactionManager,如果事先没有添加指定任何 transactionmanger,最终会从容器中按照类型获取一个 PlatformTransactionManager; 3)执行目标方法 如果正常,利用事务管理器,提交事务 如果异常,获取到事务管理器,利用事务管理回滚操作;
    • transactionAdvisor 方法:给容器中注册事务增强器 transactionAdvisor;
    • transactionAttributeSource 方法:创建了一个注解事务的属性解析对象
    • transactionInterceptor 方法:返回值是:事务拦截器 transactionInterceptor:

02

@Transactional

2.1 作用

此注解是 Spring 注解配置事务的核心注解,无论是注解驱动开发还是注解和 XML 混合开发,只有涉及配置事务采用注解的方式,都需要使用此注解。

通过源码我们看到,该注解可以出现在接口上,类上和方法上。分别表明:

  • 接口上:当前接口的所有实现类中重写接口的方法有事务支持。
  • 类上:当前类中所有方法有事务支持。
  • 方法上:当前方法有事务的支持。

优先级:方法上 > 类上 > 接口上。

2.2 源码
代码语言:javascript
复制
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
    /**
     * 指定事务管理器的唯一标识
     */
    @AliasFor("transactionManager") String value() default "";

    /**
     * 指定事务管理器的唯一标识
     */
    @AliasFor("value") String transactionManager() default "";

    /**
     * 指定事务的传播行为
     */
    Propagation propagation() default Propagation.REQUIRED;

    /**
     * 指定事务的隔离级别
     */
    Isolation isolation() default Isolation.DEFAULT;

    /**
     * 指定事务的超时时间
     */
    int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;

    /**
     * 指定事务是否只读
     */
    boolean readOnly() default false;

    /**
     * 通过指定异常类的字节码,限定事务在特定情况下回滚
     */
    Class<? extends Throwable>[] rollbackFor() default {};

    /**
     * 通过指定异常类的全限定类名,限定事务在特定情况下回滚
     */
    String[] rollbackForClassName() default {};

    /**
     * 通过指定异常类的字节码,限定事务在特定情况下不回滚
     */
    Class<? extends Throwable>[] noRollbackFor() default {};

    /**
     * 通过指定异常类的全限定类名,限定事务在特定情况下不回滚
     */
    String[] noRollbackForClassName() default {};
}

2.3 源码分析

2.3.1 在 @EnableTransactionManagement 注解中,有一个导入器:

TransactionManagementConfigurationSelector,导入器中在 AdiviceMode 为默认值 PROXY 时,往

容器中注入了两个bean对象,AutoProxyRegistrar 和 ProxyTransactionManagementConfiguration。

代码语言:javascript
复制
// 通过 @Import注解,Spring 定义了一个事务管理配置类的导入器。
@Import(TransactionManagementConfigurationSelector.class) 
public @interface EnableTransactionManagement { 
  //其余代码略 
}

2.3.2 ProxyTransactionManagementConfiguration 类中的一个方法:

代码语言:javascript
复制
@Bean 
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) 
public TransactionAttributeSource transactionAttributeSource() { 
  //创建了一个注解事务的属性解析对象 
  return new AnnotationTransactionAttributeSource(); 
}

2.3.3 AnnotationTransactionAttributeSource 类的实例化

代码语言:javascript
复制
public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource implements Serializable {
    /**
     * 默认构造函数
     */
    public AnnotationTransactionAttributeSource() {
        this(true);
    }

    /**
     * 带参构造(表明是否为public方法)
     */
    public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
        this.publicMethodsOnly = publicMethodsOnly;
        //判断是不是jta或者是ejp
        if (jta12Present || ejb3Present) {
            this.annotationParsers = new LinkedHashSet<>(4);
            this.annotationParsers.add(new SpringTransactionAnnotationParser());
            //jta
            if (jta12Present) {
                this.annotationParsers.add(new
                        JtaTransactionAnnotationParser());
            }
            //ejp
            if (ejb3Present) {
                this.annotationParsers.add(new
                        Ejb3TransactionAnnotationParser());
            }
        } else {
            //当都不是的时候,构建一个SpringTransactionAnnotationParser(事务注解解析器)
            this.annotationParsers = Collections.singleton(new
                    SpringTransactionAnnotationParser());
        }
    }
}

2.3.4 SpringTransactionAnnotationParser 的注解解析:

代码语言:javascript
复制
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {
    /**
     * 根据传入被注解的元素解析。
     * 可以是Method,Field,Class,Package,Construct等等。
     */
    @Override
    @Nullable
    public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
        AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);
        if (attributes != null) { //调用根据传入注解属性解析
            return parseTransactionAnnotation(attributes);
        } else {
            return null;
        }
    }

    /**
     * 根据传入注解解析
     */
    public TransactionAttribute parseTransactionAnnotation(Transactional ann) {
        return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false));
    }

    /**
     * 根据传入的注解属性解析
     */
    protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes
                                                                      attributes) {
        RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
        Propagation propagation = attributes.getEnum("propagation");
        rbta.setPropagationBehavior(propagation.value());
        Isolation isolation = attributes.getEnum("isolation");
        rbta.setIsolationLevel(isolation.value());
        rbta.setTimeout(attributes.getNumber("timeout").intValue());
        rbta.setReadOnly(attributes.getBoolean("readOnly"));
        rbta.setQualifier(attributes.getString("value"));
        List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
        for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
            rollbackRules.add(new RollbackRuleAttribute(rbRule));
        }
        for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
            rollbackRules.add(new RollbackRuleAttribute(rbRule));
        }
        for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
            rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
        }
        for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
            rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
        }
        rbta.setRollbackRules(rollbackRules);
        return rbta;
    }
    //其余代码略
}

03

@TransactionEventListener

3.1 作用

它是 spring 在 4.2 版本之后加入的注解。用于配置一个事务的事件监听器。使我们在事务提交和回滚前后可以做一些额外的功能。

例如:对事务执行监控,执行中同步做一些操作等等。

3.2 使用示例

自定义事件类:

代码语言:javascript
复制
public class MyApplicationEvent extends ApplicationEvent {
    public MyApplicationEvent(Object source) {
        super(source);
    }
}

自定义监听类:

代码语言:javascript
复制
@Component
@Slf4j
public class MyTransactionEventListener {
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void doSomething(MyApplicationEvent event) {
        Map map = (Map) event.getSource();
        log.info("事务提交后执行===>转出账户:" + map.get("sourceName") + ",转入账户:" + map.get("targetName") + ",转账金额:" + map.get("money"));
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void otherSomething(MyApplicationEvent event) {
        Map map = (Map) event.getSource();
        log.info("事务回滚后执行===>转出账户:" + map.get("sourceName") + ",转入账户:" + map.get("targetName") + ",转账金额:" + map.get("money"));
    }
}

业务层:

代码语言:javascript
复制
@Service
public class AccountServiceImpl implements AccountService {
    @Autowired
    private AccountDao accountDao;
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void transfer(String sourceName, String targetName, Double money) {
        try {
            //1.根据名称查询转出账户
            Account source = accountDao.findByName(sourceName);
            //2.根据名称查询转入账户
            Account target = accountDao.findByName(targetName);
            //3.转出账户减钱
            source.setMoney(source.getMoney() - money);
            //4.转入账户加钱
            target.setMoney(target.getMoney() + money);
            accountDao.update(source); 
            // 模拟转账异常
            int i = 1 / 0; 
            //6.更新转入账户
            accountDao.update(target);
        } finally {
            Map<String, Object> map = new HashMap<>();
            map.put("sourceName", sourceName);
            map.put("targetName", targetName);
            map.put("money", money);
            // 发布自定义事件(此处可以发布多个事件,如邮件通知、短信通知等,利用自定义监听器实现代码解耦)
            applicationEventPublisher.publishEvent(new MyApplicationEvent(map));
        }
    }
}

3.3 源码

代码语言:javascript
复制
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface TransactionalEventListener {
    /**
     * 指定事务监听器的执行是在何时。
     * 取值有:
     *      事务提交之前
     *      事务提交之后(默认值)
     *      事务回滚之后
     *      事务执行完成之后
     */
    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

    /**
     * 若没有事务的时候,对应的event是否已经执行
     * 默认值为false表示没事务就不执行了
     */
    boolean fallbackExecution() default false;

    /**
     * 指定事件类的字节码
     */
    @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] value() default {};

    /**
     * 它和value属性的作用是一样的
     */
    @AliasFor(annotation = EventListener.class, attribute = "classes") Class<?>[] classes() default {};

    /**
     * 用于指定执行事件处理器的条件。取值是基于Spring的el表达式编写的。
     */
    String condition() default "";
}
3.4 源码分析

3.4.1 在 IoC 容器加载时,执行 AbstractApplicationContext 的 refresh() 方法,一共十二个步骤,在执行到

代码语言:javascript
复制
// Instantiate all remaining (non-lazy-init) singletons. 
finishBeanFactoryInitialization(beanFactory);//触发其他单例bean的加载

3.4.2 AbstractApplicationContext 的 finishBeanFactoryInitialization 方法执行时,初始化剩余的单例 bean 对象。

代码语言:javascript
复制
/**
 * 初始化剩余单例bean对象的方法
 */
protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) {
    // 方法中的其他代码略
    // 初始化剩余单例bean对象.调用的是DefaultListableBeanFactory类中的preInstantiateSingletons方法。
    beanFactory.preInstantiateSingletons();
}

3.4.3 DefaultListableBeanFactory 类中的 preInstantiateSingletons 方法中执行了 afterSingletonsInstantiated() 方法。此方法是SmartInitializingSingleton 接口中声明的。具体实现类包含:EventListenerMethodProcessor 事件监听器方法处理器。

3.4.4 EventListenerMethodProcessor 中的 afterSingletonsInstantiated 会被执行,该方法中包含处理 bean 的方法:processBean。

3.4.5 在 processBean 方法中调用了创建事件监听器的方法 createApplicationListener。该方法是 EventListenerFactory 接口中声明的方法。

3.4.6 TransactionalEventListenerFactory 类实现了 EventListenerFactory 接口,并重写了 createApplicationListener 方法。

代码语言:javascript
复制
/**
 * 重写接口中的创建监听器方法
 */
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
    return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}

3.4.7 ApplicationListenerMethodTransactionalAdapter 的实例化。至此,解析 TransactionalEventListener 注解的过程完成。

代码语言:javascript
复制
/**
 * 构造函数
 */
public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
    super(beanName, targetClass, method);
    TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
    if (ann == null) {
        throw new IllegalStateException("No TransactionalEventListener annotation found on method: "+method);
    }
    this.annotation = ann;
}

04

TransactionTemplate

此类是用于编程式事务的模板对象。源码分析如下:

代码语言:javascript
复制
public class TransactionTemplate extends DefaultTransactionDefinition implements TransactionOperations, InitializingBean {
    /**
     * 定义日志组件.
     */
    protected final Log logger = LogFactory.getLog(getClass());
    /**
     * 定义事务管理器对象
     */
    @Nullable
    private PlatformTransactionManager transactionManager;

    /*** 默认构造函数 */
    public TransactionTemplate() {
    }

    /*** 通过事务管理器构建事务模板对象 */
    public TransactionTemplate(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    /*** 通过事务管理器和事务定义信息构建模板对象 */
    public TransactionTemplate(PlatformTransactionManager transactionManager, TransactionDefinition transactionDefinition) {
        super(transactionDefinition);
        this.transactionManager = transactionManager;
    }

    /*** 当使用默认构造函数构建事务模板对象时,可以通过此方法注入事务管理器 */
    public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    /*** 获取使用的事务管理器对象 */
    @Nullable
    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    /*** 判断事务管理器是否为空 */
    @Override
    public void afterPropertiesSet() {
        if (this.transactionManager == null) {
            throw new IllegalArgumentException("Property 'transactionManager' is required");
        }
    }

    /*** 编程事务控制的核心方法,重写的是TransactionOperations接口中的方法 */
    @Override
    @Nullable
    public <T> T execute(TransactionCallback<T> action) throws TransactionException {
        Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); 
        // 判断当前事务管理器是否为CallbackPreferringPlatformTransactionManager类型, 如果是的话,直接使用该接口实现类中的execute方法执行。而无需继续让 PlatformTransactionManager的实现类控制事务,当前坐标环境下它只有一个实现类:WebSphereUowTransactionManager。
        if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
            return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
        } else {
            // 需要借助PlatformTransactionManager的实现类控制事务。
            TransactionStatus status = this.transactionManager.getTransaction(this);
            T result;
            try {
                // 执行TransactionCallback中的doInTransaction方法,此处又是策略模式。
                // spring只提供了一个接口(还有一个抽象实现类),而具体需要事务支持的业务代 码由使用者提供。
                result = action.doInTransaction(status);
            } catch (RuntimeException | Error ex) { 
                // 当doInTransaction执行有异常时事务回滚
                rollbackOnException(status, ex);
                throw ex;
            } catch (Throwable ex) { 
                // 当doInTransaction执行有无法预知的异常时,事务回滚。
                rollbackOnException(status, ex);
                throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
            }
            // 没有异常的话,事务提交
            this.transactionManager.commit(status); 
            // 返回执行结果(有可能是结果集,也有可能是影响数据库记录的行数)
            return result;
        }
    }

    /** 出现异常事务回滚 */
    private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
        Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
        logger.debug("Initiating transaction rollback on application exception", ex);
        try {
            // 执行事务管理器中提供的回滚方法。
            this.transactionManager.rollback(status);
        } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
        } catch (RuntimeException | Error ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw ex2;
        }
    }

    /*** 重写equals方法 */
    @Override
    public boolean equals(Object other) {
        return (this == other || (super.equals(other) && (!(other instanceof TransactionTemplate) || getTransactionManager() == ((TransactionTemplate) other).getTransactionManager())));
    }
}

05

DataSourceUtils

Spring 中数据源的工具类。里面定义着获取连接的方法。

代码语言:javascript
复制
public abstract class DataSourceUtils {
    /*** 获取连接的方法,它本身没有任何操作而是调用了doGetConnection */
    public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
        try {
            // 真正获取连接的方法
            return doGetConnection(dataSource);
        } catch (SQLException ex) {
            throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: ", ex);
        } catch (IllegalStateException ex) {
            throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
        }
    }

    /*** 真正从数据源中获取连接的方法 */
    public static Connection doGetConnection(DataSource dataSource) throws SQLException {
        Assert.notNull(dataSource, "No DataSource specified");
        // 通过事务同步管理器对象获取连接持有者对象
        ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); 
        // 当连接持有者不为null时,并且再满足连接持有者有连接或者是同步的事务其中任何一个条件, 则直接返回连接持有者的连接对象。
        // (synchronizedWithTransaction默认值为false。但是在DataSourceTransactionManager中的doBegin方法中对synchronizedWithTransaction属性赋值为 true了。
        if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
            conHolder.requested();
            // 如果ConnectionHandle为null,则返回false,此处取反。就表示 ConnectionHandle为null时,进入if代码块中,给ConnectionHolder设置一个连接
            if (!conHolder.hasConnection()) {
                logger.debug("Fetching resumed JDBC Connection from DataSource");
                conHolder.setConnection(fetchConnection(dataSource));
            }// 返回ConnectionHolder对象中的连接
            return conHolder.getConnection();
        }
        logger.debug("Fetching JDBC Connection from DataSource");
        //如果不满足上面的条件,则从数据源中获取一个连接
        Connection con = fetchConnection(dataSource);
        //判断是否激活了事务同步器
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            try {
                // 在激活同步的条件下,如果ConnectionHolder为null就创建连接持有者对象
                ConnectionHolder holderToUse = conHolder;
                if (holderToUse == null) { //创建连接持有者对象
                    holderToUse = new ConnectionHolder(con);
                } else {
                    // 直接使用已经存在的连接持有者,并把获取到的连接填充进去
                    holderToUse.setConnection(con);
                }
                holderToUse.requested(); //注册同步器
                TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource)); //设置snchronizedWithTransaction属性为true
                holderToUse.setSynchronizedWithTransaction(true);

                // 从名称为Transactional resources的ThreadLocal中获取绑定的 Map,并把数据源和ConnectionHolder存入map中
                if (holderToUse != conHolder) {
                    TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
                }
            }
        } catch(RuntimeException ex) {
            // Unexpected exception from external delegation call -> close Connection and rethrow.
            releaseConnection(con, dataSource);
            throw ex;
        }
        // 此时如果激活了事务同步管理器,则返回当前线程的连接。如果没激活,返回的就是数据源中拿 到的连接。
        return con;
    }

    /*** 从数据源中获取一个连接的方法,此时没有和线程绑定 */
    private static Connection fetchConnection(DataSource dataSource) throws SQLException {
        //从数据源中获取一个连接
        Connection con = dataSource.getConnection();
        //如果没有,则表示数据源中没有连接
        if (con == null) {
            throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource);
        }
        // 返回拿到的连接对象
        return con;
    }
    // 其余代码略
}

06

TransactionSynchronizationManager

事务的同步管理器类实现连接和线程绑定从而控制事务的核心类

它是个抽象类,但是没有任何子类 因为它所有的方法都是静态的。

代码语言:javascript
复制
public abstract class TransactionSynchronizationManager {
    // 定义了很多ThreadLocal
    // 应用代码随事务的声明周期绑定的对象 比如:DataSourceTransactionManager有这么做:
    // TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    // TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
    // 简单理解为当前线程的数据存储中心~~~~
    // 使用的同步器,用于应用扩展
    // TransactionSynchronization同步器是最为重要的一个扩展点~~~
    // 这里是个set 所以每个线程都可以注册N多个同步器
    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
    // 事务的名称
    private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
    // 事务是否是只读
    private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
    // 事务的隔离级别
    private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
    // 事务是否开启 actual:真实的
    private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");

    // 判断是否是开启了和线程绑定机制。当开启了之后,我们通过DataSourceUtils获取的连接就是当 前线程的连接了。
    public static boolean isSynchronizationActive() {
        return (synchronizations.get() != null);
    }

    /*** 初始化同步器方法。此方法就是在synchronizations中设置了一个LinkedHashSet * 当然如果事务已经开启了,就不能再初始化同步器了 而是直接注册 */
    public static void initSynchronization() throws IllegalStateException {
        if (isSynchronizationActive()) {
            throw new IllegalStateException("Cannot activate transaction synchronization - already active");
        }
        logger.trace("Initializing transaction synchronization");
        synchronizations.set(new LinkedHashSet<>());
    }// 清除所有和当前线程相关的(注意:此处只是clear清除,和当前线程的绑定而已~~~) 

    public static void clear() {
        synchronizations.remove();
        currentTransactionName.remove();
        currentTransactionReadOnly.remove();
        currentTransactionIsolationLevel.remove();
        actualTransactionActive.remove();
    }
    //其余方法略 
}

07

TransactionAwareDataSourceProxy

这是 Spring 提供的一个数据源代理类,它继承了 DelegatingDataSource 类。

因为数据连接泄露是个很头疼的问题,Spring 框架也提供了很多种办法来避免这个问题。

比如使用 XXXTemplate,当然其背后是 DataSourceUtils。

同时还有另外一种办法,使用 TransactionAwareDataSourceProxy。

通过 TransactionAwareDataSourceProxy 对数据源代理后,数据源对象就有了事务上下文感知的能力了。

当然看源码会发现,其实它还是使用的 DataSourceUtils。

代码语言:javascript
复制
public class TransactionAwareDataSourceProxy extends DelegatingDataSource {
    /*** 暴露出来的获取连接的方法 */
    @Override
    public Connection getConnection() throws SQLException {
        return getTransactionAwareConnectionProxy(obtainTargetDataSource());
    }

    /*** 使用JDK的动态代理创建连接的代理对象 */
    protected Connection getTransactionAwareConnectionProxy(DataSource targetDataSource) {
        return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(), new Class<?>[]{ConnectionProxy.class}, new TransactionAwareInvocationHandler(targetDataSource));
    }

    /*** InvocationHandler的具体实现(增强的部分) */
    private class TransactionAwareInvocationHandler implements InvocationHandler {
        private final DataSource targetDataSource;
        @Nullable
        private Connection target;
        private boolean closed = false;

        public TransactionAwareInvocationHandler(DataSource targetDataSource) {
            this.targetDataSource = targetDataSource;
        }

        @Override
        @Nullable
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // Invocation on ConnectionProxy interface coming in... 
            if (method.getName().equals("equals")) {
                // Only considered as equal when proxies are identical.
                return proxy == args[0];
            } else if (method.getName().equals("hashCode")) {
                // Use hashCode of Connection proxy.
                return System.identityHashCode(proxy);
            } else if (method.getName().equals("toString")) {
                // Allow for differentiating between the proxy and the raw Connection.
                StringBuilder sb = new StringBuilder("Transaction-aware proxy for target Connection ");
                if (this.target != null) {
                    sb.append("[").append(this.target.toString()).append("]");
                } else {
                    sb.append(" from DataSource [").append(this.targetDataSource).append("]");
                }
                return sb.toString();
            } else if (method.getName().equals("unwrap")) {
                if (((Class<?>) args[0]).isInstance(proxy)) {
                    return proxy;
                }
            } else if (method.getName().equals("isWrapperFor")) {
                if (((Class<?>) args[0]).isInstance(proxy)) {
                    return true;
                }
            } else if (method.getName().equals("close")) {
                // 释放资源仍然使用的是DataSourceUtils中的方法
                DataSourceUtils.doReleaseConnection(this.target, this.targetDataSource);
                this.closed = true;
                return null;
            } else if (method.getName().equals("isClosed")) {
                return this.closed;
            }
            if (this.target == null) {
                if (this.closed) {
                    throw new SQLException("Connection handle already closed");
                }
                if (shouldObtainFixedConnection(this.targetDataSource)) {
                    this.target = DataSourceUtils.doGetConnection(this.targetDataSource);
                }
            }
            Connection actualTarget = this.target;
            if (actualTarget == null) { 
                //获取连接使用的也是DataSourceUtils中的方法
                actualTarget = DataSourceUtils.doGetConnection(this.targetDataSource);
            }
            if (method.getName().equals("getTargetConnection")) {
                return actualTarget;
            }// Invoke method on target Connection. 
            try {
                Object retVal = method.invoke(actualTarget, args);
                // If return value is a Statement, apply transaction timeout. 
                // Applies to createStatement, prepareStatement, prepareCall. 
                if (retVal instanceof Statement) {
                    DataSourceUtils.applyTransactionTimeout((Statement) retVal, this.targetDataSource);
                }
                return retVal;
            } catch (InvocationTargetException ex) {
                throw ex.getTargetException();
            } finally {
                if (actualTarget != this.target) {
                    DataSourceUtils.doReleaseConnection(actualTarget, this.targetDataSource);
                }
            }
        }
    }
    //其余代码略 
}

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码上修行 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 「七剑下天山」
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档