源码分析 spring事务处理机制

Spring在TransactionDefinition接口中定义这些属性,以供PlatfromTransactionManager使用, PlatfromTransactionManager是Spring事务管理的核心接口。

接口代码如下:

[java] view plain copy

  1. public interface TransactionDefinition {  
  2. int getPropagationBehavior(); //返回事务的传播行为。 
  3. int getIsolationLevel(); //返回事务的隔离级别,事务管理器根据它来控制另外一个事务可以看到本事务内的哪些数据。
  4. int getTimeout(); //返回事务必须在多少秒内完成。
  5. boolean isReadOnly(); //事务是否只读,事务管理器能够根据这个返回值进行优化,确保事务是只读的。 
  6. }  

事务的传播属性有:

PROPAGATION_REQUIRED 如果存在一个事务,则支持当前事务。如果没有事务则开启一个新的事务。(默认的传播属性)

PROPAGATION_REQUIRES_NEW 总是开启一个新的事务。如果一个事务已经存在,则将这个存在的事务挂起。

PROPAGATION_SUPPORTS 如果存在一个事务,支持当前事务。如果没有事务,则非事务的执行。

PROPAGATION_MANDATORY 如果已经存在一个事务,支持当前事务。如果没有一个活动的事务,则抛出异常。

PROPAGATION_NOT_SUPPORTED 总是非事务地执行,并挂起任何存在的事务。

PROPAGATION_NEVER 总是非事务地执行,如果存在一个活动事务,则抛出异常;

PROPAGATION_NESTED如果一个活动的事务存在,则运行在一个嵌套的事务中. 如果没有活动事务, 则按TransactionDefinition.PROPAGATION_REQUIRED 属性执行。

当我们的程序调用到 把被 @Transaction 注解修饰的方法时,会被spring的AOP切面拦截,该方法会被进行增强,其中就包含了spring对该方法进行事务管理。spring会对不同的传播属性进行不同的事务处理。spring 通过 AbstractPlatformTransactionManager这个类来管理事务。

方法如下:

[java] view plain copy

  1. public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {    
  2. //doGetTransaction()方法是抽象方法,具体的实现由具体的事务处理器提供  
  3.         Object transaction = doGetTransaction();    
  4. boolean debugEnabled = logger.isDebugEnabled();    
  5. //如果没有配置事务属性,则使用默认的事务属性  
  6. if (definition == null) {    
  7.             definition = new DefaultTransactionDefinition();    
  8.         }    
  9. //检查当前线程是否存在事务  
  10. if (isExistingTransaction(transaction)) {    
  11. //处理已存在的事务  
  12. return handleExistingTransaction(definition, transaction, debugEnabled);    
  13.         }    
  14. //检查事务属性中timeout超时属性设置是否合理  
  15. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {    
  16. throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());    
  17.         }    
  18. //对事务属性中配置的事务传播特性处理  
  19. //如果事务传播特性配置的是mandatory,当前没有事务存在,抛出异常  
  20. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {    
  21. throw new IllegalTransactionStateException(    
  22. "No existing transaction found for transaction marked with propagation 'mandatory'");    
  23.         }    
  24. //如果事务传播特性为required、required_new或nested  
  25. else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||    
  26.                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||    
  27.             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {    
  28.             SuspendedResourcesHolder suspendedResources = suspend(null);    
  29. if (debugEnabled) {    
  30.                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);    
  31.             }    
  32. //创建事务  
  33. try {    
  34. //不激活和当前线程绑定的事务,因为事务传播特性配置要求创建新的事务  
  35. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);    
  36. //创建一个新的事务状态  
  37.                 DefaultTransactionStatus status = newTransactionStatus(    
  38.                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);    
  39. //创建事务的调用,具体实现由具体的事务处理器提供  
  40.                 doBegin(transaction, definition);    
  41. //初始化和同步事务状态  
  42.                 prepareSynchronization(status, definition);    
  43. return status;    
  44.             }    
  45. catch (RuntimeException ex) {    
  46.                 resume(null, suspendedResources);    
  47. throw ex;    
  48.             }    
  49. catch (Error err) {    
  50.                 resume(null, suspendedResources);    
  51. throw err;    
  52.             }    
  53.         }    
  54. else {    
  55. //创建空事务,否则就创建一个空事务,没有实际事务,但可能同步。(实际上就是不使用事务)
  56. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);    
  57. //准备事务状态  
  58. return prepareTransactionStatus(definition, nulltrue, newSynchronization, debugEnabled, null);    
  59.         }    
  60.     }    

AbstractPlatformTransactionManager处理已经存在的事务:

  1. private TransactionStatus handleExistingTransaction(    
  2.            TransactionDefinition definition, Object transaction, boolean debugEnabled)    
  3. throws TransactionException {    
  4. //如果事务传播特性为:never,则抛出异常  
  5. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {    
  6. throw new IllegalTransactionStateException(    
  7. "Existing transaction found for transaction marked with propagation 'never'");    
  8.        }    
  9. //如果事务传播特性是not_supported,同时当前线程存在事务,则将事务挂起  
  10. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {    
  11. if (debugEnabled) {    
  12.                logger.debug("Suspending current transaction");    
  13.            }    
  14. //挂起事务  
  15.            Object suspendedResources = suspend(transaction);    
  16. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);    
  17. //创建非事务的事务状态,让方法非事务地执行  
  18. return prepareTransactionStatus(    
  19.                    definition, nullfalse, newSynchronization, debugEnabled, suspendedResources);    
  20.        }    
  21. //如果事务传播特性是required_new,则创建新事务,同时把当前线程中存在的  
  22. /事务挂起    
  23. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {    
  24. if (debugEnabled) {    
  25.                logger.debug("Suspending current transaction, creating new transaction with name [" +    
  26.                        definition.getName() + "]");    
  27.            }    
  28. //挂起已存在的事务  
  29.            SuspendedResourcesHolder suspendedResources = suspend(transaction);    
  30. try {    
  31. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);    
  32. //将挂起的事务状态保存起来  
  33.                DefaultTransactionStatus status = newTransactionStatus(    
  34.                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);    
  35. //创建新事务  
  36.                doBegin(transaction, definition);    
  37.                prepareSynchronization(status, definition);    
  38. return status;    
  39.            }    
  40. catch (RuntimeException beginEx) {    
  41.                resumeAfterBeginException(transaction, suspendedResources, beginEx);    
  42. throw beginEx;    
  43.            }    
  44. catch (Error beginErr) {    
  45.                resumeAfterBeginException(transaction, suspendedResources, beginErr);    
  46. throw beginErr;    
  47.            }    
  48.        }    
  49. //如果事务传播特性是nested嵌套事务  
  50. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {    
  51. //如果不允许事务嵌套,则抛出异常  
  52. if (!isNestedTransactionAllowed()) {    
  53. throw new NestedTransactionNotSupportedException(    
  54. "Transaction manager does not allow nested transactions by default - " +    
  55. "specify 'nestedTransactionAllowed' property with value 'true'");    
  56.            }    
  57. if (debugEnabled) {    
  58.                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");    
  59.            }    
  60. //如果允许使用savepoint保存点保存嵌套事务  
  61. if (useSavepointForNestedTransaction()) {    
  62. //为当前事务创建一个保存点  
  63.                DefaultTransactionStatus status =    
  64.                        prepareTransactionStatus(definition, transaction, falsefalse, debugEnabled, null);    
  65.                status.createAndHoldSavepoint();    
  66. return status;    
  67.            }    
  68. //如果不允许使用savepoint保存点保存嵌套事务  
  69. else {    
  70. //使用JTA的嵌套commit/rollback调用  
  71. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);    
  72.                DefaultTransactionStatus status = newTransactionStatus(    
  73.                        definition, transaction, true, newSynchronization, debugEnabled, null);    
  74.                doBegin(transaction, definition);    
  75.                prepareSynchronization(status, definition);    
  76. return status;    
  77.            }    
  78.        }    
  79. //对于事务传播特性为supported和required的处理  
  80. if (debugEnabled) {    
  81.            logger.debug("Participating in existing transaction");    
  82.        }    
  83. //校验已存在的事务,如果已有事务与事务属性配置不一致,则抛出异常  
  84. if (isValidateExistingTransaction()) {    
  85. //如果事务隔离级别不是默认隔离级别  
  86. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {    
  87. //获取当前事务的隔离级别  
  88.                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();    
  89. //如果获取到的当前事务隔离级别为null获取不等于事务属性配置的隔离级别  
  90. if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {    
  91.                    Constants isoConstants = DefaultTransactionDefinition.constants;    
  92. throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)"));    
  93.                }    
  94.            }    
  95. //如果事务不是只读  
  96. if (!definition.isReadOnly()) {    
  97. //如果当前已有事务是只读  
  98. if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {    
  99. throw new IllegalTransactionStateException("Participating transaction with definition [" +    
  100.                            definition + "] is not marked as read-only but existing transaction is");    
  101.                }    
  102.            }    
  103.        }    
  104. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);    
  105. //返回当前事务的执行状态  
  106. return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);    
  107.    }    

加个抽离无关代码的总结,对流程更清晰一点

下篇将会介绍spring的事务管理如何在日常的开发中更灵活的使用

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏魏琼东

分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

     在前面的分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载一文之中给大家分享和介绍了一个极其简单也非常容易上的基于...

1190
来自专栏IT笔记

JAVA实现的微信扫描二维码支付

吐槽一下 支付项目采用springMvc+Dubbo架构实现,只对外提供接口。 话说,为什么微信支付比支付宝来的晚了那么一点,一句话,那一阵挺忙的,然后就没有时...

1.5K8
来自专栏Brian

基于CentOS7安装Jupyter Notebook

---- 概述 由于我的开发环境是CentOS,现在主要是以Python为主,偶尔夹杂C/C++和GoLang语言。在本地开发、测试、提交git和在生产环境再去...

4784
来自专栏简书专栏

基于bs4+requests的蓝房网爬虫(进阶版)

1.代码可以直接运行,请下载anaconda并安装,用spyder方便查看变量 或者可以查看生成的excel文件 2.依赖库,命令行运行(WIN10打开命令...

1083
来自专栏james大数据架构

资源等待类型sys.dm_os_wait_stats

动态管理视图  sys.dm_os_wait_stats  返回执行的线程所遇到的所有等待的相关信息。可以使用该聚合视图来诊断 SQL Server 以及特定查...

2297
来自专栏用户2442861的专栏

Springmvc+uploadify实现文件上传

   网上看了很多关于文件上传的帖子,众口不一,感觉有点乱,最近正好公司的项目里用到JQuery的uploadify控件做文件上传,所以整理下头绪,搞篇文档出来...

1842

访问数据 - 反应方式(Vert.x入门的第4部分)

原文地址:https://dzone.com/articles/accessing-data-the-reactive-way

1.2K4
来自专栏Java Web

SpringBoot技术栈搭建个人博客【后台开发】

在查了一些资料和吸收了一些评论给出良好的建议之后,我觉得有必要对一些设计进行一些调整:

1.1K6
来自专栏FreeBuf

CTF挑战130分 | Rickdiculously 1.0

Vulnhub是一个提供各种漏洞环境的靶场平台,供安全爱好者学习渗透使用,大部分环境是做好的虚拟机镜像文件,镜像预先设计了多种漏洞,需要使用VMware或者Vi...

1953
来自专栏乐沙弥的世界

Linux HugePage 特性

    HugePage,就是指的大页内存管理方式。与传统的4kb的普通页管理方式相比,HugePage为管理大内存(8GB以上)更为高效。本文描述了什么是Hu...

1654

扫码关注云+社区

领取腾讯云代金券