关于spring事务的传播机制,可以去看一下《spring事务》这篇文章。本文对传播机制实现原理进行分析。

1.Spring 事务管理的核心接口与类

1.1.TransactionDefinition

该接口定义了事务的所有属性(隔离级别,传播类型,超时时间等等),我们日常开发中经常使用的 @Transactional 其实最终会被转化为 TransactionDefinition

1.2.TransactionStatus

事务的状态,以最常用的实现 DefaultTransactionStatus 为例,该类存储了当前的事务对象,savepoint,当前挂起的事务,是否完成,是否仅回滚等等

1.3.TransactionManager

这是一个空接口,直接继承他的 interface 有 PlatformTransactionManager(我们平时用的就是这个,默认的实现类DataSourceTransactionManager)以及
ReactiveTransactionManager(响应式事务管理器,由于不是本文重点,我们不多说)

从上述两个接口来看,TransactionManager 的主要作用

  • 通过TransactionDefinition开启一个事务,返回TransactionStatus
  • 通过TransactionStatus 提交、回滚事务(实际开启事务的Connection通常存储在TransactionStatus中)
public interface PlatformTransactionManager extends TransactionManager {
    TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException;
            
    void commit(TransactionStatus status) throws TransactionException;
    
    void rollback(TransactionStatus status) throws TransactionException;
}

1.4.TransactionInterceptor

事务拦截器,事务AOP的核心类(支持响应式事务,编程式事务,以及我们常用的标准事务),由于篇幅原因,本文只讨论标准事务的相关实现

下面我们从事务逻辑的入口 TransactionInterceptor 入手,来看下Spring事务管理的核心逻辑以及事务传播的实现

TransactionInterceptor 实现了MethodInvocation(这是实现AOP的一种方式),其核心逻辑在父类TransactionAspectSupport 中

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {
    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 当前事务的属性 TransactionAttribute extends TransactionDefinition
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 事务属性中可以定义当前使用哪个事务管理器
    // 如果没有定义就去Spring上下文找到一个可用的 TransactionManager
    final TransactionManager tm = determineTransactionManager(txAttr);

    // 省略了响应式事务的处理 ...
    
    PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 如果有下一个拦截器则执行,最终会执行到目标方法,也就是我们的业务代码
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            // 当捕获到异常时完成当前事务 (提交或者回滚)
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);
        }

        if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
            // Set rollback-only in case of Vavr failure matching our rollback rules...
            TransactionStatus status = txInfo.getTransactionStatus();
            if (status != null && txAttr != null) {
                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
            }
        }
        // 根据事务的状态提交或者回滚
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }

    // 省略了编程式事务的处理 ...
}
  1. 获取当前事务属性,事务管理器(以注解事务为例,这些都可以通过@Transactional来定义)
  2. createTransactionIfNecessary,判断是否有必要创建事务
  3. invocation.proceedWithInvocation 执行拦截器链,最终会执行到目标方法
  4. completeTransactionAfterThrowing当抛出异常后,完成这个事务,提交或者回滚,并抛出这个异常
  5. commitTransactionAfterReturning 从方法命名来看,这个方法会提交事务。

但是深入源码中会发现,该方法中也包含回滚逻辑,具体行为会根据当前TransactionStatus的一些状态来决定(也就是说,我们也可以通过设置当前TransactionStatus,来控制事务回滚,并不一定只能通过抛出异常),详见AbstractPlatformTransactionManager::commit

业务代码中可以通过TransactionAspectSupport.currentTransactionStatus()获取当前TransactionStatus

我们继续,来看看createTransactionIfNecessary做了什么

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 通过事务管理器开启事务
            status = tm.getTransaction(txAttr);
        }
        else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                        "] because no transaction manager has been configured");
            }
        }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
  1. 通过PlatformTransactionManager(事务管理器)开启事务
  2. prepareTransactionInfo 准备事务信息,这个具体做了什么我们稍后再讲

继续来看PlatformTransactionManager::getTransaction,该方法只有一个实现 AbstractPlatformTransactionManager::getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    // 获取当前事务,该方法有继承 AbstractPlatformTransactionManager 的子类自行实现
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // 如果目前存在事务
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // 传播类型PROPAGATION_MANDATORY, 要求当前必须有事务
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事务时创建事务
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }
        try {
            // 开启事务
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}
  1. doGetTransaction获取当前事务
  2. 如果存在事务,则调用handleExistingTransaction处理,这个我们稍后会讲到。接下来,会根据事务的传播决定是否开启事务
  3. 如果事务传播类型为PROPAGATION_MANDATORY,且不存在事务,则抛出异常
  4. 如果传播类型为 PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED,且当前不存在事务,则调用startTransaction创建事务
  5. 当不满足 3、4时,例如 PROPAGATION_NOT_SUPPORTED,此时会执行事务同步,但是不会创建真正的事务

事务同步:Spring不仅可以管理关系型数据的事务,还可以同步管理Redis等其他数据库的事务

2.Spring 如何管理当前的事务

接下来讲讲上面提到的doGetTransaction、handleExistingTransaction,这两个方法是由不同的TransactionManager自行实现的

我们以SpringBoot默认的TransactionManager,DataSourceTransactionManager为例

@Override
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

@Override
protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

结合 AbstractPlatformTransactionManager::getTransaction 一起来看,doGetTransaction 其实获取的是当前的Connection。
判断当前是否存在事务,是判断DataSourceTransactionObject 对象中是否包含connection,以及connection是否开启了事务。

我们继续来看下TransactionSynchronizationManager.getResource(obtainDataSource())获取当前connection的逻辑

package org.springframework.transaction.support;

private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<>("Transactional resources");

@Nullable
public static Object getResource(Object key) {
	Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
	return doGetResource(actualKey);
}

/**
 * Actually check the value of the resource that is bound for the given key.
 */
@Nullable
private static Object doGetResource(Object actualKey) {
	Map<Object, Object> map = resources.get();
	if (map == null) {
		return null;
	}
	Object value = map.get(actualKey);
	// Transparently remove ResourceHolder that was marked as void...
	if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
		map.remove(actualKey);
		// Remove entire ThreadLocal if empty...
		if (map.isEmpty()) {
			resources.remove();
		}
		value = null;
	}
	return value;
}

看到这里,我们能明白DataSourceTransactionManager是如何管理线程之间的Connection,ThreadLocal 中存储一个Map,key为数据源对象,value为该数据源在当前线程的Connection
image-1651289026711
DataSourceTransactionManager 在开启事务后,会调用TransactionSynchronizationManager::bindResource将指定数据源的Connection绑定到当前线程

如果存在事务的情况,如何处理

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    // 如果事务的传播要求以非事务方式执行 抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    // PROPAGATION_NOT_SUPPORTED 如果存在事务,则挂起当前事务,以非事务方式执行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 挂起当前事务
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 构建一个无事务的TransactionStatus
        return prepareTransactionStatus(
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // PROPAGATION_REQUIRES_NEW 如果存在事务,则挂起当前事务,新建一个事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // PROPAGATION_NESTED 内嵌事务,就是我们开头举得例子
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        // 非JTA事务管理器都是通过savePoint实现的内嵌事务
        // savePoint:关系型数据库中事务可以创建还原点,并且可以回滚到还原点
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            // 创建还原点
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }

    // 如果执行到这一步传播类型一定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED
    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    
    // 校验目前方法中的事务定义和已存在的事务定义是否一致
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 构建一个TransactionStatus,但不开启事务
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

这里终于看到了期待已久的挂起事务和内嵌事务了,看一下DataSourceTransactionManager的实现

  • 挂起事务:通过TransactionSynchronizationManager::unbindResource 根据数据源获取当前的Connection,并在resource中移除该Connection。之后会将该Connection存储到TransactionStatus对象中
// DataSourceTransactionManager::doSuspend
@Override
protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

在事务提交或者回滚后,调用 AbstractPlatformTransactionManager::cleanupAfterCompletion会将TransactionStatus 中缓存的Connection重新绑定到resource中

  • 内嵌事务:通过关系型数据库的savePoint实现,提交或回滚的时候会判断如果当前事务为savePoint则释放savePoint或者回滚到savePoint,具体逻辑参考AbstractPlatformTransactionManager::processRollbackAbstractPlatformTransactionManager::processCommit

至此,事务的传播源码分析结束

3.prepareTransactionInfo

上文留下了一个问题,prepareTransactionInfo 方法做了什么,我们先来看下TransactionInfo的结构

protected static final class TransactionInfo {

    @Nullable
    private final PlatformTransactionManager transactionManager;

    @Nullable
    private final TransactionAttribute transactionAttribute;

    private final String joinpointIdentification;

    @Nullable
    private TransactionStatus transactionStatus;

    @Nullable
    private TransactionInfo oldTransactionInfo;
    
    // ...
}

该类在Spring中的作用,是为了内部传递对象。ThreadLocal中存储了最新的TransactionInfo,通过当前TransactionInfo可以找到他的oldTransactionInfo。每次创建事务时会新建一个TransactionInfo(无论有没有真正的事务被创建)存储到ThreadLocal中,在每次事务结束后,会将当前ThreadLocal中的TransactionInfo重置为oldTransactionInfo,这样的结构形成了一个链表,使得Spring事务在逻辑上可以无限嵌套下去

原文

浅入浅出 Spring 事务传播实现原理