spring源码系列11:事务代理对象的执行

spring源码系列9:事务代理的创建
一节,
事务通过定义

  • 切点: TransactionAttributeSourcePointcut 、
  • 通知(拦截器) TransactionInterceptor
  • Advisor: BeanFactoryTransactionAttributeSourceAdvisor

    AOP基础上实现事务代理的功能

在 spring源码系列10:AOP代理对象的执行
一节。
总结出,不管是AOP-JDK代理还是CGLB动态代理,都会执行Advice完成增强功能。

也就是说:事务的核心功能就在这个 TransactionInterceptor

事务执行

TransactionInterceptor

@Override
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
		return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
			@Override
			public Object proceedWithInvocation() throws Throwable {
				return invocation.proceed();
			}
		});
	}
复制代码

交给父类 TransactionAspectSupport.invokeWithinTransaction()
去执行

TransactionAspectSupport

invokeWithinTransaction方法比较长,我们看前半部分。

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
			throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
		1.
		final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
		2.
		final PlatformTransactionManager tm = determineTransactionManager(txAttr);
		3.
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			4.
			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
			Object retVal = null;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				5.
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
				6.
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
			7.
				cleanupTransactionInfo(txInfo);
			}
			8.
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}
		... 
	}
复制代码

1.拿到事务属性TransactionAttribute :

我们使用注解@Transactional时,注解元信息会被包装成TransactionAttribute ,此处拿到的就是@Transactional的元数据

2.determineTransactionManager(txAttr)

找到一个合适的事务管理

protected PlatformTransactionManager determineTransactionManager(TransactionAttribute txAttr) {
		// Do not attempt to lookup tx manager if no tx attributes are set
		//未设置事务属性,也没有设置beanFactory ,直接返回
		if (txAttr == null || this.beanFactory == null) {
			return getTransactionManager();
		}
		//如果@Transactional 指定了具体事务管理器,则根据beanname去容器中找到他
		String qualifier = txAttr.getQualifier();
		if (StringUtils.hasText(qualifier)) {
			return determineQualifiedTransactionManager(qualifier);
		}
		//如果是TransactionAspectSupport.transactionManagerBeanName指定了具体事务管理器,
		//beanname去容器中找
		else if (StringUtils.hasText(this.transactionManagerBeanName)) {
			return determineQualifiedTransactionManager(this.transactionManagerBeanName);
		}
		//没有beanName指定具体的事务管理器Bean。
		/**
		1.查看transactionManager属性是否设置事务管理器对象
		2.查看事务管理器缓存中有没有
		3.去容器中寻找,找PlatformTransactionManager接口的实现类。getBean(PlatformTransactionManager),
		找到放到事务管理器缓存中。
		**/
		else {
			PlatformTransactionManager defaultTransactionManager = getTransactionManager();
			if (defaultTransactionManager == null) {
				defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
				if (defaultTransactionManager == null) {
					defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
					this.transactionManagerCache.putIfAbsent(
							DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
				}
			}
			return defaultTransactionManager;
		}
	}
复制代码

以集成了DataSourceTransactionManager为例。determineTransactionManager返回的就是DataSourceTransactionManager对象。

3.methodIdentification方法

获取目标方法全名

4.createTransactionIfNecessary开启事务(重点)

protected TransactionInfo createTransactionIfNecessary(
			PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
		// If no name specified, apply method identification as transaction name.
		如果事务名称没有指定,则使用方法名作为事务名
		if (txAttr != null && txAttr.getName() == null) {
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				public String getName() {return joinpointIdentification;}
			};
		}
		...
		TransactionStatus status = tm.getTransaction(txAttr);
		...
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}
复制代码

重点在

  • getTransaction
    方法获取TransactionStatus 事务
  • prepareTransactionInfo
    方法上,封装一个TransactionInfo

4.1:getTransaction ()

getTransaction ()是一个模板方法,PlatformTransactionManager定义了getTransaction 方法。抽象类AbstractPlatformTransactionManager实现了getTransaction 方法。因为是模板方法。方法内很多方法都是在具体的PlatformTransactionManager实现的。(本文以DataSourceTransactionManager为例)

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
		1.尝试获取事务对象,并封装当前线程中以当前datasource为keyConnection。
		Object transaction = doGetTransaction();
		2.没有配置事务属性,则创建一个默认的事务属性
		if (definition == null) {
			// Use defaults if no transaction definition given.
			definition = new DefaultTransactionDefinition();
		}
		3.判断是否有已经存在是否:判断条件是当前线程有Connection.并且Connection是活跃的
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			已存在事务,则处理传播属性,然后返回。(处理两个事务之间的传播属性关系)
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}
		===========当前没有事务
		// Check definition settings for new transaction.
		4.超时时间校验
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}
		5.开始处理当前事务
		// No existing transaction found -> check propagation behavior to find out how to proceed.
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
			}
			try {
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				doBegin(transaction, definition);
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException ex) {
				resume(null, suspendedResources);
				throw ex;
			}
			catch (Error err) {
				resume(null, suspendedResources);
				throw err;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + definition);
			}
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}
复制代码

下面拆解此方法

4.1.1:doGetTransaction获取事务对象
doGetTransaction的实现在DataSourceTransactionManager中,doGetTransactiond创建一个DataSourceTransactionObject用于表示事务。并尝试获取一个与当前线程关联的Connection,这一部分工作交给事务同步管理器TransactionSynchronizationManager来完成。核心在doGetResource方法上。

private static final ThreadLocal<Map<Object, Object>> resources =
			new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
private static Object doGetResource(Object actualKey) {
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}
复制代码

以当前Datasoure对象为key ,从ThreadLocal对象resources中获取Connection

4.1.2:没有事务属性创建一个事务属性

4.1.3:判断当前是否有事务存在( 重点

默认是false,子类可以重写isExistingTransaction方法。DataSourceTransactionManager中重写了此方法

@Override
	protected boolean isExistingTransaction(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
	}
复制代码

查看事务对象获取的Connectin是否为空,Connection是否活跃。 是否存在事务指:当前线程中是否存在以当前数据源为key的获取连接Connection.
4.1.3.1: handleExistingTransaction( 处理当前有事务的情况

事务的传播属性处理

  • NEVER: 不支持当前事务,因为当前有事务,抛出异常
  • NOT_SUPPORTED:不支持当前事务,当前有事务存在,就挂起当前事务。所谓的 suspend(transaction)
    挂起在代码里的表现是:(1)将当前是我的ConnectionHolder设置为null(2)从ThreadLocal对象resources中移除当前线程的Connection(3)将事务的挂起状态封装到SuspendedResourcesHolder中设置到TransactionStatus中,返回。
  • REQUIRES_NEW:挂起当前事务,创建新事务。 suspend(transaction)
    挂起, doBegin(transaction, definition)
    创建新事务;
  • NESTED:嵌套事务。分为非JTA事务与JTA事务。JTA事务创建 doBegin(transaction, definition)
    创建新事务
  • SUPPORTS/REQUIRED/MANDATORY: 不处理

4.1.4: 校验超时

4.1.5: 处理当前没有事务的情况

doBegin(transaction, definition)

事务创建doBegin

  1. 事务里没有数据库连接Connection,则从dataSource里获取一个
  2. 设置隔离级别
  3. 设置数据库自动提交为false
  4. 把Connection放到ThreadLocal对象resources中

至此:getTransaction 返回一个封装了事务的TransactionStatus对象。
总结下:getTransaction 方法 重要点

  • 获取数据连接,
  • 处理spring事务传播方式
  • 设置自动提交为false

4.2prepareTransactionInfo方法

prepareTransactionInfo方法主要是封装:事务管理器 PlatformTransactionManager
对象, TransactionAttribute
事务属性对象,方法名 joinpointIdentification
TransactionStatus
对象 成一 TransactionInfo
对象。并把 TransactionInfo
对象通过 bindToThread()
方法绑定到 ThreadLocal<TransactionInfo> transactionInfoHolder

至此: createTransactionIfNecessary完成,得到一个TransactionInfo对象。

5.invocation.proceedWithInvocation();

执行下一个增强。在没其他增强的情况下,这通常会导致调用目标对象。

6.completeTransactionAfterThrowing

在目标方法执行错误的情况下,catch异常,执行回滚。
核心在DataSourceTransactionManager.doRollback方法

Connection con = txObject.getConnectionHolder().getConnection();
		if (status.isDebug()) {
			logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
		}
		try {
			con.rollback();
		}
复制代码

7.cleanupTransactionInfo(txInfo)

finally 一定执行,清除事务信息。 其实就是把ThreadLocal transactionInfoHolder里的新事务信息清除掉。设置为原事务信息

8.commitTransactionAfterReturning(txInfo);

提交事务
核心在DataSourceTransactionManager.doCommit方法

Connection con = txObject.getConnectionHolder().getConnection();
		if (status.isDebug()) {
			logger.debug("Committing JDBC transaction on Connection [" + con + "]");
		}
		try {
			con.commit();
		}
复制代码

至此整个事务执行原理完成。里面涉及到很多细节,由于篇幅的原因不能一一列出。建议多看代码。

原文 

https://juejin.im/post/5dc21b0ae51d456e64036ce0

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » spring源码系列11:事务代理对象的执行

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址