转载

spring事务源码分析结合mybatis源码(三)

下面将结合mybatis源码来分析下,这种持久化框架是如何对connection使用,来达到spring事务的控制。

想要在把mybatis跟spring整合都需要这样一个jar包: mybatis-spring-x.x.x.jar,这里面定义了一些主要的整合信息。

在spring配置文件中需要配置如下两个bean:

<!-- mybatis配置 --> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">  <property name="dataSource" ref="dynamicDataSource" />  <property name="configLocation" value="classpath:mybatis.xml"></property>  <!-- mybatis配置文件 -->  <property name="mapperLocations" value="classpath:com/blackbread/dao/mapper/*.xml" /> </bean> <!--mapper scanning --> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">  <property name="basePackage" value="com.blackbread.dao" />  <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" /> </bean> 

首先让我们来看 org.mybatis.spring.SqlSessionFactoryBean,在这个类需要注入跟之前tx配置中一样的 dataSource。

SqlSessionFactoryBean类实现了InitializingBean接口,所以会执行 afterPropertiesSet方法,在afterPropertiesSet方法中会执行 buildSqlSessionFactory方法生成一个sqlSessionFactory对象,让我们看下buildSqlSessionFactory方法:由于主要看的是跟spring tx结合的方式,所以代码看不上很细,如有疏漏,望不吝赐教。

protected SqlSessionFactory buildSqlSessionFactory() throws IOException {   Configuration configuration;   XMLConfigBuilder xmlConfigBuilder = null;   //初始化一个configuration   if (this.configLocation != null) {     xmlConfigBuilder = new XMLConfigBuilder(this.configLocation.getInputStream(), null, this.configurationProperties);     configuration = xmlConfigBuilder.getConfiguration();   } else {     configuration = new Configuration();     configuration.setVariables(this.configurationProperties);   }   if (this.objectFactory != null) {     configuration.setObjectFactory(this.objectFactory);   }   if (this.objectWrapperFactory != null) {     configuration.setObjectWrapperFactory(this.objectWrapperFactory);   }   if (hasLength(this.typeAliasesPackage)) {     String[] typeAliasPackageArray = tokenizeToStringArray(this.typeAliasesPackage,         ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);     for (String packageToScan : typeAliasPackageArray) {       configuration.getTypeAliasRegistry().registerAliases(packageToScan,               typeAliasesSuperType == null ? Object.class : typeAliasesSuperType);     }   }   //设置别名   if (!isEmpty(this.typeAliases)) {     for (Class<?> typeAlias : this.typeAliases) {       configuration.getTypeAliasRegistry().registerAlias(typeAlias);     }   }   //装入插件,mybatis的插件都是以拦截器的形式进行的好像,比如分页插件,这里是载入spring中注入的   if (!isEmpty(this.plugins)) {     for (Interceptor plugin : this.plugins) {       configuration.addInterceptor(plugin);     }   }   if (hasLength(this.typeHandlersPackage)) {     String[] typeHandlersPackageArray = tokenizeToStringArray(this.typeHandlersPackage,         ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);     for (String packageToScan : typeHandlersPackageArray) {       configuration.getTypeHandlerRegistry().register(packageToScan);     }   }    if (!isEmpty(this.typeHandlers)) {     for (TypeHandler<?> typeHandler : this.typeHandlers) {       configuration.getTypeHandlerRegistry().register(typeHandler);     }   }   //这里将解析mybatis.xml文件,载入所有配置,插件、setting等   if (xmlConfigBuilder != null) {     try {       xmlConfigBuilder.parse();     } catch (Exception ex) {       throw new NestedIOException("Failed to parse config resource: " + this.configLocation, ex);     } finally {       ErrorContext.instance().reset();     }   }   //这个很重要,这里定义了用的transactionFactory为SpringManagedTransactionFactory,这个在获取   //connection等地方都有用到,是mybatis跟spring的主要链接   if (this.transactionFactory == null) {     this.transactionFactory = new SpringManagedTransactionFactory();   }   //新建一个Environment对象,并将新建的transactionFactory放入其中   Environment environment = new Environment(this.environment, this.transactionFactory, this.dataSource);   configuration.setEnvironment(environment);   if (this.databaseIdProvider != null) {     try {       configuration.setDatabaseId(this.databaseIdProvider.getDatabaseId(this.dataSource));     } catch (SQLException e) {       throw new NestedIOException("Failed getting a databaseId", e);     }   }   if (!isEmpty(this.mapperLocations)) {     for (Resource mapperLocation : this.mapperLocations) {       if (mapperLocation == null) {         continue;       }       try {       //这里主要是解析配置的sql mapper配置文件         XMLMapperBuilder xmlMapperBuilder = new XMLMapperBuilder(mapperLocation.getInputStream(),             configuration, mapperLocation.toString(), configuration.getSqlFragments());         xmlMapperBuilder.parse();       } catch (Exception e) {         throw new NestedIOException("Failed to parse mapping resource: '" + mapperLocation + "'", e);       } finally {         ErrorContext.instance().reset();       }     }   } else {     }   }   return this.sqlSessionFactoryBuilder.build(configuration); } 

看着好长的一段啊,其实这里做的工作就是解析配置文件生成configuration对象而已。在

 xmlMapperBuilder.parse();

这里将解析sql mapper文件中的映射关系生成MappedStatement对象,并执行 configuration.addMappedStatement(statement);放入到configuration对象中,有兴趣的同学可以仔细看下。

这里主要需要注意的一块就是this.transactionFactory = new SpringManagedTransactionFactory();这里就是mybatis跟spring连接到一起的地方。

接着我们看一下org.mybatis.spring.mapper.MapperScannerConfigurer对象的初始化过程,这个对象实现了BeanDefinitionRegistryPostProcessor接口,在postProcessBeanDefinitionRegistry方法中初始化一个对象ClassPathMapperScanner,并讲执行scan--->doScan方法,

public Set<BeanDefinitionHolder> doScan(String... basePackages) {   Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);     for (BeanDefinitionHolder holder : beanDefinitions) {       GenericBeanDefinition definition = (GenericBeanDefinition) holder.getBeanDefinition();       definition.getPropertyValues().add("mapperInterface", definition.getBeanClassName());       //实际就是将扫描到的接口包装成MapperFactoryBean的实现类       definition.setBeanClass(MapperFactoryBean.class);       definition.getPropertyValues().add("addToConfig", this.addToConfig);       boolean explicitFactoryUsed = false;       //注入sqlSessionFactory对象,这个也很重要       if (StringUtils.hasText(this.sqlSessionFactoryBeanName)) {         definition.getPropertyValues().add("sqlSessionFactory", new RuntimeBeanReference(this.sqlSessionFactoryBeanName));         explicitFactoryUsed = true;       } else if (this.sqlSessionFactory != null) {         definition.getPropertyValues().add("sqlSessionFactory", this.sqlSessionFactory);         explicitFactoryUsed = true;       }       if (StringUtils.hasText(this.sqlSessionTemplateBeanName)) {         if (explicitFactoryUsed) {           logger.warn("Cannot use both: sqlSessionTemplate and sqlSessionFactory together. sqlSessionFactory is ignored.");         }         definition.getPropertyValues().add("sqlSessionTemplate", new RuntimeBeanReference(this.sqlSessionTemplateBeanName));         explicitFactoryUsed = true;       } else if (this.sqlSessionTemplate != null) {         if (explicitFactoryUsed) {           logger.warn("Cannot use both: sqlSessionTemplate and sqlSessionFactory together. sqlSessionFactory is ignored.");         }         definition.getPropertyValues().add("sqlSessionTemplate", this.sqlSessionTemplate);         explicitFactoryUsed = true;       }        if (!explicitFactoryUsed) {         definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);       }     }   return beanDefinitions; } 

这段代码其实主要就是从basePackage中扫描到相应的接口类,并且注册到spring中,并且定义此对象的实现类为:MapperFactoryBean,同时添加属性:sqlSessionFactory。

这个操作很重要,在后面有关联操作。

这里让我们看下MapperFactoryBean类,这个类继承自SqlSessionDaoSupport而在SqlSessionDaoSupport中有如下方法:

public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) {     if (!this.externalSqlSession) {       this.sqlSession = new SqlSessionTemplate(sqlSessionFactory);     }   }

也就是上面调用的添加sqlSessionFactory属性的set操作,在这个方法中初始话sqlSession,利用的是SqlSessionTemplate对象。

接下来让我们看下SqlSessionTemplate的初始化过程:

public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,     PersistenceExceptionTranslator exceptionTranslator) {    notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required");   notNull(executorType, "Property 'executorType' is required");    this.sqlSessionFactory = sqlSessionFactory;   this.executorType = executorType;   this.exceptionTranslator = exceptionTranslator;   this.sqlSessionProxy = (SqlSession) newProxyInstance(       SqlSessionFactory.class.getClassLoader(),       new Class[] { SqlSession.class },       new SqlSessionInterceptor()); } 

SqlSessionTemplate其实是实现了SqlSession接口的,在初始化的时候将生成一个sqlSessionProxy 代理类,可以查看下SqlSessionTemplate里面的所有与数据库相关的操作都是通过sqlSessionProxy 这个代理类实现的。

接着看下sqlSessionProxy 的实际handler:

private class SqlSessionInterceptor implements InvocationHandler {   public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {     SqlSession sqlSession = getSqlSession(         SqlSessionTemplate.this.sqlSessionFactory,         SqlSessionTemplate.this.executorType,         SqlSessionTemplate.this.exceptionTranslator);     try {       Object result = method.invoke(sqlSession, args);       if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {         sqlSession.commit(true);       }       return result;     } catch (Throwable t) {       Throwable unwrapped = unwrapThrowable(t);       if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {         closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);         sqlSession = null;         Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);         if (translated != null) {           unwrapped = translated;         }       }       throw unwrapped;     } finally {       if (sqlSession != null) {         closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);       }     }   } } 

这里首先需要获取一个SqlSession对象:

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {   SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);   if (holder != null && holder.isSynchronizedWithTransaction()) {     if (holder.getExecutorType() != executorType) {       throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction");     }     holder.requested();     return holder.getSqlSession();   }   SqlSession session = sessionFactory.openSession(executorType);   if (TransactionSynchronizationManager.isSynchronizationActive()) {     Environment environment = sessionFactory.getConfiguration().getEnvironment();     if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {       holder = new SqlSessionHolder(session, executorType, exceptionTranslator);       TransactionSynchronizationManager.bindResource(sessionFactory, holder);       TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));       holder.setSynchronizedWithTransaction(true);       holder.requested();     } else {       if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {       } else {         throw new TransientDataAccessResourceException(             "SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");       }     }   } else {   }   return session; } 

这里将会获取一个SqlSessionHolder并判断是否已经存在,如果不存在将会初始化一个新的,我们这里只分析第一次调用过程,也就是将会执行

 SqlSession session = sessionFactory.openSession(executorType);
--->openSessionFromDataSource
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {   Transaction tx = null;   try {     final Environment environment = configuration.getEnvironment();     final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);     tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);     final Executor executor = configuration.newExecutor(tx, execType, autoCommit);     return new DefaultSqlSession(configuration, executor);   } catch (Exception e) {     closeTransaction(tx); // may have fetched a connection so lets call close()     throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);   } finally {     ErrorContext.instance().reset();   } } 

这里是主要的跟spring结合部分,让我们仔细分析下,首先这里将获取TransactionFactory: final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);这里将得到我们之前初始化时候加入的 SpringManagedTransactionFactory 。然后将初始化当前的tx:

tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);

然后将生成一个Executor ,将final Executor executor = configuration.newExecutor(tx, execType, autoCommit);这里在之前指定了execType为Simple,所以在这里将生成一个SimpleExecutor: executor = new SimpleExecutor(this, transaction);并将transaction加入属性。

到这里SqlSession的初始化也就完成了,接下来就是通过反射进行实际方法的执行了:

Object result = method.invoke(sqlSession, args);

以一个update操作来说明:

public int update(String statement, Object parameter) {   try {     dirty = true;     MappedStatement ms = configuration.getMappedStatement(statement);     return executor.update(ms, wrapCollection(parameter));   } catch (Exception e) {     throw ExceptionFactory.wrapException("Error updating database.  Cause: " + e, e);   } finally {     ErrorContext.instance().reset();   } } 

这里首先将从configuration中根据操作的statement获取映射内容MappedStatement ,

getMappedStatement(String id)---->getMappedStatement(String id, boolean validateIncompleteStatements)
接着将执行executor.update(ms, wrapCollection(parameter)),也就是实际的数据库操作了,记得之前初始化的
executor么,这里就是对应的SimpleExecutor
public int update(MappedStatement ms, Object parameter) throws SQLException {     ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());     if (closed) throw new ExecutorException("Executor was closed.");     clearLocalCache();     return doUpdate(ms, parameter);   }
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {   Statement stmt = null;   try {     Configuration configuration = ms.getConfiguration();     StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);     stmt = prepareStatement(handler, ms.getStatementLog());     return handler.update(stmt);   } finally {     closeStatement(stmt);   } } 

这里主要是看prepareStatement方法:

private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {   Statement stmt;   Connection connection = getConnection(statementLog);   stmt = handler.prepare(connection);   handler.parameterize(stmt);   return stmt; } 

然后看Connection 方法:

protected Connection getConnection(Log statementLog) throws SQLException {     Connection connection = transaction.getConnection();     if (statementLog.isDebugEnabled()) {       return ConnectionLogger.newInstance(connection, statementLog);     } else {       return connection;     }   }

到这里看到了真正的Connection获取方法:  transaction.getConnection();也就是通过之前注入的 transaction 中获取connection,而这个transaction也就是对应的SpringManagedTransaction,他的调用过程getConnection()---->openConnection()

private void openConnection() throws SQLException {     this.connection = DataSourceUtils.getConnection(this.dataSource);     this.autoCommit = this.connection.getAutoCommit();     this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);   }

这里其实就是调用了DataSourceUtils.getConnection(this.dataSource);来获取一个Connection。

看看DataSourceUtils的getConnection(DataSource dataSource)--->doGetConnection(DataSource dataSource)

public static Connection doGetConnection(DataSource dataSource) throws SQLException {  //从TransactionSynchronizationManager中获取ConnectionHolder,这个对象也就是之前我们第一次分析spring tx的时候  //持有ConnectionHolder的对象了  ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);  //由于在前面的切面中已经开启事务,并且初始化了ConnectionHolder所以这里将直接返回ConnectionHolder中的connection  if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {   conHolder.requested();   if (!conHolder.hasConnection()) {    conHolder.setConnection(dataSource.getConnection());   }   return conHolder.getConnection();  }  Connection con = dataSource.getConnection();  if (TransactionSynchronizationManager.isSynchronizationActive()) {   ConnectionHolder holderToUse = conHolder;   if (holderToUse == null) {    holderToUse = new ConnectionHolder(con);   }   else {    holderToUse.setConnection(con);   }   holderToUse.requested();   TransactionSynchronizationManager.registerSynchronization(     new ConnectionSynchronization(holderToUse, dataSource));   holderToUse.setSynchronizedWithTransaction(true);   if (holderToUse != conHolder) {    TransactionSynchronizationManager.bindResource(dataSource, holderToUse);   }  }  return con; } 

是不是感觉这段代码很眼熟,对了,因为这里有我们第一篇里面非常熟悉的TransactionSynchronizationManager,在spring tx中也是通过这个类中的resources (ThreadLocal对象)对ConnectionHolder进行持有的。

在这里将获取到之前持有的ConnectionHolder对象,并从中获取到Connection 对象然后返回,这样就保证了spring tx中控制的Connection 跟实际调用的Connection为同一个Connection,也就可以通过spring tx对事务进行管理了。

后续的对数据的操作有兴趣的可以自己读一下,感觉mybatis的源码没有spring的那么清晰,还是需要仔细分析下才能整合到一起。

看的比较粗略,难免有疏漏地方,望不吝赐教。

正文到此结束
Loading...