Spring事务处理的实现

参考《Spring技术内幕(第2版)》

概述

Spring事务处理模块是通过AOP功能来实现声明式事务处理的,如事务属性的配置和读取、事务对象的抽象等功能。因此,在Spring中设计了TransactionProxyFactoryBean来实现AOP功能,通过它可以生成proxy代理对象,在这个代理对象中,通过TransactionInterceptor来完成对代理方法的拦截。对于具体的事务处理,由于不同的底层数据库有不同的方式,在spring事务处理中,对主要的事务实现做了一个抽象和适配。

声明式事务处理的基本过程

(1)读取和处理在IoC容器中配置的事务处理属性,转化为Spring事务处理需要的内部数据结构。
涉及类:TransactionAttributeSourceAdvisor
作用:这是一个AOP通知器,Spring用它来完成对事务处理属性值的处理。IoC中配置的的事务处理信息->TransactionAttribute表示的数据对象(是spring对事务处理属性值的数据抽象)

(2)Spring事务处理模块实现的统一的事务处理过程。[包含处理事务配置属性、线程绑定完成事务处理的过程]
涉及类:TransactionInfo和TransactionStatus
作用:在事务处理中记录和传递相关执行场景

(3)底层的事务处理实现。[Spring委托给具体的事务处理器完成]

实现分析
1.事务处理拦截器配置

在TransactionProxyFactoryBean中将TranscationManager和事务属性注册到TranscationInterceptor中
这个拦截器相当于AOP中的advice,通过这个拦截器的实现,Spring封装了事务处理的实现。

1
2
3
4
5
6
7
private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();

public void setTransactionManager(PlatformTransactionManager transactionManager){this.transactionInterceptor.setTransactionManager(transactionManager);
}

public void setTransactionAttributes(Properties transactionAttributes) {this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
}

创建AOP对事务处理的advisor
如果pointcut不为空,使用默认通知器并为通知器配置事务处理拦截器
如果没有配置pointcut,则使用TransactionAtributeSourceAdvisor作为通知器并为其配置拦截器。

1
2
3
4
5
6
7
8
9
10
11
/**
* Creates an advisor for this FactoryBean's TransactionInterceptor.
*/
@Override
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();
if (this.pointcut != null) {return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
}else {// Rely on default pointcut.
return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
}

1
afterPropertiesSet()

这是Spring完成AOP配置的地方。创建ProxyFactory对象,实现AOP的使用。

2.事务处理配置的读入

主要分析TranscationAttributorSourceAdvisor,了解事务配置属性的读入

首先设置advice和pointcut(事务配置属性会在对proxy的方法进行匹配时使用)

1
2
3
4
5
6
7
8
private TransactionInterceptor transactionInterceptor;

private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
protected TransactionAttributeSource getTransactionAttributeSource() {
return (transactionInterceptor != null ? transactionInterceptor.getTransactionAttributeSource() : null);
}
};

在声明式事务处理中,通过对目标对象的方法调用进行拦截来实现,这个拦截通过AOP发挥作用。对于拦截器的启动,首先要判断是否需要对方法进行拦截,判断的标准就是事务属性。

TransactionAttributeSourcePointcut:matches方法判断目标方法调用是不是一个配置好的并需要事务处理的方法调用

1
2
3
4
5
6
7
public boolean matches(Method method, Class<?> targetClass) {
if (TransactionalProxy.class.isAssignableFrom(targetClass)) {
return false;
}
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

在matches方法中,会用到TransactionAttributeSource对象,这个对象是刚刚在第一步中对TransactionInterceptor进行依赖注入时配置好的。在TransactionInterceptor中,该方法实现如下:

1
2
3
4
5
public void setTransactionAttributes(Properties transactionAttributes) {    
NameMatchTransactionAttributeSource tas = new NameMatchTransactionAttributeSource();
tas.setProperties(transactionAttributes);
this.transactionAttributeSource = tas;
}

NameMatchTransactionAttributeSource作为TansactionAttributeSource的具体实现,是实际上事务处理属性读入和匹配的地方,以下是其如何进行读入和匹配的代码。此处判断调用方法是否是事务方法,如果是,则取出相应的事务配置属性。具体过程如下,首先以调用方法名为索引在nameMap中查找事务配置属性,如果找到了就直接返回。如果没有找到,就遍历nameMap,对其中的方法名进行模式匹配(进行模式匹配的原因是在设置事务方法时,可以不用为其设置完整的方法名,可以通过设置方法名的命名模式来完成,比如使用*)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
if (!ClassUtils.isUserLevelMethod(method)) {return null;
}
// Look for direct name match.
String methodName = method.getName();
TransactionAttribute attr = this.nameMap.get(methodName);

if (attr == null) {
// Look for most specific name match.
String bestNameMatch = null;
for (String mappedName : this.nameMap.keySet()) {
if (isMatch(methodName, mappedName) &&
(bestNameMatch == null || bestNameMatch.length() <= mappedName.length())) {
attr = this.nameMap.get(mappedName);
bestNameMatch = mappedName;
}
}
}
return attr;
}

通过以上过程可以得到与目标对象调用方法相关的TransactionAttribute对象,为TransactionInterceptor做好了对调用的目标方法添加事务处理的准备。

3.事务拦截器的设计与实现

在以上的准备工作后,经过AOP包装,如果此时对目标对象进行调用,实际上起作用的会是一个Proxy代理对象,对目标对象方法的调用,会被设置的事务处理拦截器拦截。(对Spring而言,事务管理是通过一个TransactionInfo对象实现的,在该对象中,封装了事务对象和事务处理的状态信息,即事务的抽象)
TransactionInterceptor中invoke()方法是proxy代理对象的回调方法,在调用代理对象方法时会触发这个回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Object invoke(final MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)throws Throwable {
//读取事务配置属性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//得到具体的事务处理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

//要区分是否是CallbackPreferringPlatformTransactionManager
//如果不是,就不需要用回调函数实现事务的创建和提交
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {//创建事务,同时将事务创建过程中得到的信息放到TransactionInfo中去
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//此处使处理沿着拦截器链进行,使最后目标对象的方法得到调用(因为有可能在该事务处理对象中还配置了除了事务处理AOP之外的其他拦截器)
retVal = invocation.proceedWithInvocation();
}catch (Throwable ex) {// target invocation exception
//出现异常:根据具体情况回滚或提交
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}finally {//对TransactionInfo中的信息进行更新
cleanupTransactionInfo(txInfo);
}//通过事务处理器对事务进行提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
//采用回调方法来使用事务处理
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}else {
throw new ThrowableHolderException(ex);
}
}else {
// A normal return value: will lead to a commit.
return new ThrowableHolder(ex);
}
}finally {
cleanupTransactionInfo(txInfo);
}
}
});

// Check result: It might indicate a Throwable to rethrow.
if (result instanceof ThrowableHolder) {
throw ((ThrowableHolder) result).getThrowable();
}else {
return result;
}
}catch (ThrowableHolderException ex) {
throw ex.getCause();
}
}
}
1
2
3
4
5
6
7
8
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

这个提交的处理过程已经封装在PlatformTransactionManager的事务处理器中,与具体数据源相关的处理,都委托给相关具体事务处理器来完成,比如DataSourceTransactionManager、HibernateTransactionManager

2.Spring事务处理的设计与实现

- 声明式事务的创建、挂起、提交和回滚

1.创建

事务创建的结果是创建一个TransactionStatus对象,通过这个对象保存事务处理需要的基本信息。

以TransactionAspectSupport中的 createTransactionIFNecessary()方法作为入口。
在这个方法的调用中,会向AbstractTransactionManager执行getTransaction()。这个过程,在AbstractTransactionManager实现中需要对事务的情况做出不同的处理,把创建工作交给具体的事务处理器来完成,把创建的事务对象在TransactionStatus中保存下来。如果线程中已有事务存在:进行事务创建处理(handleExistingTransaction()方法)。然后将创建的TransactionStatus设置到对应的TransactionInfo中去,同时将TransactionInfo和当前线程绑定,完成事务创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
//封装事务的状态信息
TransactionStatus status = null;
if (txAttr != null) {if (tm != null) {
//这里使用了定义好的事务方法的配置信息
//事务的创建由事务处理器来完成,同时返回TransactionStatus来记录当前的事务状态
status = tm.getTransaction(txAttr);
}else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

- prepareTransactionInfo中为TransactionInfo设置TransactionStatus,并把当前的TransactionInfo与线程绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();

// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
//如果已经存在事务,要根据在事务属性中定义的事务传播属性配置来处理事务的产生
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
//检查timeout是否合理
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
//这里会设置事务的传播属性,如REQUIRES_NEW等
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
}else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//此处创建事务的调用,由具体的事务处理器完成
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + definition);
}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

在AbstractPlatformTransactionManager提供的事务创建模版的基础上,具体的事务处理器需要定义自己的实现来完成底层的事务创建操作,如要实现doBegin()等方法

2.挂起

事务挂起主要牵涉线程与事务处理信息的保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//返回的SuspendedResourcesHolder会作为参数传给TransactionStatus
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
//把挂起事务的处理交给具体的事务处理器,如果处理器不支持事务挂起,就抛出异常
suspendedResources = doSuspend(transaction);
}//在线程中保存与事务处理相关的信息,并重置线程中相关的ThreadLocal变量
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}catch (RuntimeException | Error ex) {// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}else if (transaction != null) {// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}else {
// Neither transaction nor synchronization active.
return null;
}
}

3.提交

事务的提交的入口在TransactionInteceptor的invoke方法中

1
2
//通过事务处理器对事务进行提交
commitTransactionAfterReturning(txInfo);

提交的对象是事务,是在事务创建时产生的

1
2
3
//创建事务,同时将事务创建过程中得到的信息放到TransactionInfo中去
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

事务提交方法在此处直接调用事务处理器来完成事务提交

1
2
3
4
5
6
7
8
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

和事务创建一样,在AbstractPlatformTransactionManager中也有一个模版方法支持具体的事务处理器对事务提交的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void commit(TransactionStatus status) throws TransactionException {
//如果在status中事务已经表识结束,要抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

//如果事务处理过程发生了异常,就要调用回滚
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
//处理回滚
processRollback(defStatus, false);
return;
}

if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}

//处理提交的入口
processCommit(defStatus);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;

try {
//事务处理的准备工作由具体的事务处理器完成
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//嵌套事务处理
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
//接下来根据当前线程中保存的事务状态进行处理,如果当前提交的是一个新事务,调用具体处理器进行提交
//如果不是新事务,则不提交(由原来存在的事务进行提交)
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
//具体的提交由具体处理器完成
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}

// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
//触发回滚
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}
4.回滚

回滚和事务提交非常相似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
triggerBeforeCompletion(status);
//嵌套事务回滚
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
//当前事务调用方法中新建事务的回滚处理
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
//当前事务调用方法中没有新建方法的回滚处理
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
//由线程中前一个事务来处理回滚,所以不做任何操作
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}