本系列有写过在spring boot中,普通数据库事务的处理方式,主要是通过@Transactional的注解,但是却不能满足于分布式事务的需求。例如:跨多个多种数据库的一致性事务,跨系统RPC调用的事务,等等。
在分布式领域基于CAP理论以及BASE理论,有人就提出了 柔性事务 的概念。 CAP (一致性、可用性、分区容忍性)理论大家都理解很多次了,这里不再叙述。说一下 BASE 理论,它是在CAP理论的基础之上的延伸。包括 基本可用(Basically Available)、柔性状态(Soft State)、最终一致性(Eventual Consistency)。
针对柔性事务的解决方案,业界内有下面几种:
本文专门讲解 2PC两阶段提交 的这种解决方案,前面会讲解如果在spring boot中配置多数据源,后续会通过引入Atomikos来实践2PC的分布式事务。
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource ##a数据源 spring.datasource.druid.a.url= spring.datasource.druid.a.username= spring.datasource.druid.a.password= spring.datasource.druid.a.driver-class-name=oracle.jdbc.driver.OracleDriver ## b数据源 spring.datasource.druid.b.url= spring.datasource.druid.b.username= spring.datasource.druid.b.password= spring.datasource.druid.b.driver-class-name=oracle.jdbc.driver.OracleDriver
ADataSourceConfig.java
/*
** @MapperScan:A 数据源dao层路径
** @Primary:多数据源时,表示默认数据源的配置
*/
@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.a",
sqlSessionFactoryRef = "aSqlSessionFactory")
public class ADataSourceConfig {
//注册数据源
@Primary
@Bean(name = "aDataSource")
@ConfigurationProperties(prefix = "spring.datasource.druid.a")
public DataSource aDataSource() {
return DruidDataSourceBuilder.create().build();
}
//注册事务管理器(很重要!!!)
@Bean(name = "aTransactionManager")
@Primary
public DataSourceTransactionManager aTransactionManager() {
return new DataSourceTransactionManager(aDataSource());
}
@Bean(name = "aSqlSessionFactory")
@Primary
public SqlSessionFactory aSqlSessionFactory(@Qualifier("aDataSource") DataSource dataSource) throws Exception {
final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
// sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/a/*.xml"));
return sessionFactoryBean.getObject();
}
}
BDataSourceConfig.java
/*
** B 数据源的配置,注意都没有 @Primary 了
*/
@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.b",
sqlSessionFactoryRef = "bSqlSessionFactory")
public class BDataSourceConfig {
@Bean(name = "bDataSource")
@ConfigurationProperties(prefix = "spring.datasource.druid.b")
public DataSource bDataSource() {
return DruidDataSourceBuilder.create().build();
}
@Bean(name = "bTransactionManager")
public DataSourceTransactionManager bTransactionManager() {
return new DataSourceTransactionManager(bDataSource());
}
@Bean(name = "bSqlSessionFactory")
public SqlSessionFactory bSqlSessionFactory(@Qualifier("bDataSource") DataSource dataSource) throws Exception {
final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
// sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/b/*.xml"));
return sessionFactoryBean.getObject();
}
}
多数据源配置,核心的代码只有上面这些。先是在配置文件中定义A、B两个数据源的连接信息,然后分别构建不同数据源的配置类,并且指向对应的dao层路径。由此:
Dao层数据源: pers.demo.transaction.transaction2pc.mapper.a.xxx.java ,dao层执行的方法,都是基于A数据源的; pers.demo.transaction.transaction2pc.mapper.b.xxx.java ,dao层执行的方法,都是基于B数据源的。
Service层事务:还记得 @Transactional 事务吗?Service层中如果没有指定事务管理器,默认会取值@Primary,即A数据源的事务管理器。如果想要使用B数据源的事务管理器,需要手动声明。
@Transactional(transactionManager = "bTransactionManager")
如果你勤于思考的话,这时就会有疑惑,当前的事务管理器都是基于单个数据源定义的,那么分布式事务该如何定义事务管理器呢?
大家对 XA 有印象吗?我实在是印象深刻。实习时第一天,就是通过ADF在本地电脑上运行WebLogic服务器,然后就是配置数据源。Oracle数据源的驱动有很多,就包括 oracle.jdbc.xa.client.OracleXADataSource ,我当时还是对这个XA疑惑很久。
XA协议 由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。XA协议采用两阶段提交方式来管理分布式事务。
XA规范定义了:
简单来说,基于XA协议的数据库,都可以采用两阶段提交方式来管理分布式事务。所幸常见的关系型数据库oracle、mysql、sql server都支持,但是一些不支持事务的nosql数据库是不行的。另外,jms、rocketmq等也是支持XA协议的,同样可以通过2PC来管理分布式事务。
JTA(Java Transaction Manager) : 是Java规范,是XA在Java上的实现.
JTA是如何实现多数据源的事务管理呢?
主要的原理是两阶段提交,以上面的请求业务为例,当整个业务完成了之后只是第一阶段提交,在第二阶段提交之前会检查其他所有事务是否已经提交,如果前面出现了错误或是没有提交,那么第二阶段就不会提交,而是直接rollback操作,这样所有的事务都会做Rollback操作.
JTA的有点就是能够支持多数据库事务同时事务管理,满足分布式系统中的数据的一致性.但是也有对应的弊端:
spring boot支持JTA的框架有很多,我们这次使用Atomikos。我们还是基于之前配置多数据源的代码。
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
ADataSourceConfig.java
@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.a",
sqlSessionFactoryRef = "aSqlSessionFactory")
@ConfigurationProperties(prefix = "spring.datasource.druid.a")
@Data
public class ADataSourceConfig {
private String url;
private String username;
private String password;
@Primary
@Bean(name = "aDataSource")
public DataSource aDataSource() {
Properties properties = new Properties();
properties.setProperty("URL", url);
properties.setProperty("user", username);
properties.setProperty("password", password);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaProperties(properties);
ds.setUniqueResourceName("AOracleXADataSource");
ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource");
return ds;
}
@Bean(name = "aTransactionManager")
@Primary
public DataSourceTransactionManager aTransactionManager() {
return new DataSourceTransactionManager(aDataSource());
}
@Bean(name = "aSqlSessionFactory")
@Primary
public SqlSessionFactory aSqlSessionFactory(@Qualifier("aDataSource") DataSource dataSource) throws Exception {
final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
return sessionFactoryBean.getObject();
}
}
BDataSourceConfig.java
@Configuration
@MapperScan(basePackages = "pers.demo.transaction.transaction2pc.mapper.b",
sqlSessionFactoryRef = "bSqlSessionFactory")
@ConfigurationProperties(prefix = "spring.datasource.druid.b")
@Data
public class BDataSourceConfig {
private String url;
private String username;
private String password;
@Bean(name = "bDataSource")
public DataSource bDataSource() {
Properties properties = new Properties();
properties.setProperty("URL", url);
properties.setProperty("user", username);
properties.setProperty("password", password);
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaProperties(properties);
ds.setUniqueResourceName("BOracleXADataSource");
ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource");
return ds;
}
@Bean(name = "bTransactionManager")
public DataSourceTransactionManager bTransactionManager() {
return new DataSourceTransactionManager(bDataSource());
}
@Bean(name = "bSqlSessionFactory")
public SqlSessionFactory bSqlSessionFactory(@Qualifier("bDataSource") DataSource dataSource) throws Exception {
final SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
return sessionFactoryBean.getObject();
}
}
在配置类中注册JTA的TransactionManager。
@Bean(name = "jtaTransactionManager")
@Primary
public JtaTransactionManager jtaTransactionManager () {
UserTransactionManager userTransactionManager = new UserTransactionManager();
UserTransaction userTransaction = new UserTransactionImp();
return new JtaTransactionManager(userTransaction, userTransactionManager);
}
DemoService.java
/**
* 同时往 A和B 两个数据库中insert数据
* @param jpaUserDO
*/
@Transactional(transactionManager = "jtaTransactionManager")
public void addJTAUser(JpaUserDO jpaUserDO){
aUserMapper.addUsername(jpaUserDO.getUsername());
bUserMapper.addUsername(jpaUserDO.getUsername());
//int a=1/0;
}
通过以上验证,2PC的分布式事务试验成功!