转载

TCC-Transaction源码解析之事务补偿

上文中,我们对TCC-Transaction的事务提交阶段主流程进行了详细的解析。上文遗留了一个问题:

如果在事务执行过程中出现了down机、停电、网络异常等情况,事务一致性就无法得到保证,此时应该怎么做?

这个问题在TCC-Transaction框架中是通过定时任务+状态机方式实现的,这种方式也是我们日常开发中经常使用的一种策略。本文,我们对事务恢复主逻辑进行分析,使TCC-Transaction源码解析形成一个闭环。

RecoverScheduledJob

事务补偿定时任务的核心逻辑由tcc-transaction-spring模块下的RecoverScheduledJob.java完成,对失败的confirm、cancel操作进行失败补偿操作。代码逻辑如下:

public class RecoverScheduledJob {

    private TransactionRecovery transactionRecovery;

    private TransactionConfigurator transactionConfigurator;

    private Scheduler scheduler;

    // 通过init方法启动quartz定时任务
    public void init() {

        try {
            MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
            jobDetail.setTargetObject(transactionRecovery);
            jobDetail.setTargetMethod("startRecover");
            jobDetail.setName("transactionRecoveryJob");
            jobDetail.setConcurrent(false);
            jobDetail.afterPropertiesSet();

            CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
            cronTrigger.setBeanName("transactionRecoveryCronTrigger");
            cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
            cronTrigger.setJobDetail(jobDetail.getObject());
            cronTrigger.afterPropertiesSet();

            scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
            scheduler.start();

        } catch (Exception e) {
            throw new SystemException(e);
        }
    }
    ...省略getter setter...
}

通过quartz进行任务调度,通过RecoverConfig中的配置初始化定时任务,通过MethodInvokingJobDetailFactoryBean的targetObject与targetMethod指定了定时任务具体执行类及具体方法。

我们注意以下代码,它将任务的核心逻辑设置到jobDetail中

jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");

最终通过quartz的Scheduler对任务发起调度,这里通过cron表达式触发器进行调度。

TransactionRecovery

TransactionRecovery是TCC-Transaction框架中事务补偿的核心实现

public class TransactionRecovery {

    ......

    private TransactionConfigurator transactionConfigurator;

通过startRecover开启事务补偿重试任务。

public void startRecover() {
    // 获取待补偿的任务列表
    List<Transaction> transactions = loadErrorTransactions();
    // 对待补偿的任务列表执行补偿操作
    recoverErrorTransactions(transactions);
}

首先通过loadErrorTransactions()获取待补偿的任务列表:

private List<Transaction> loadErrorTransactions() {

    long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
    // 获取配置的具体事务持久化策略,如:基于数据库、zk、redis等
    TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();

    // 获取重试策略,包含:cron表达式,最大重试次数等
    RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();

    // 获取在RecoverDuration间隔之前未完成的transaction列表,查询方式依具体的持久化策略而定
    return transactionRepository
        .findAllUnmodifiedSince(
            new Date(currentTimeInMillis 
                - recoverConfig.getRecoverDuration() * 1000));
}

接着看一下recoverErrorTransactions方法逻辑,对待补偿的任务列表进行补偿操作。

private void recoverErrorTransactions(List<Transaction> transactions) {
        // 对需要进行重试的事务列表进行迭代
        for (Transaction transaction : transactions) {

            // 如果重试次数超过配置的最大重试次数,则打印异常日志;跳过不再重试
            if (transaction.getRetriedCount() > 
                transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
                ...省略异常日志...
                continue;
            }

            // 如果是分支事务,并且超过最长超时时间则忽略不再重试
            if (transaction.getTransactionType().equals(TransactionType.BRANCH)
            && (transaction.getCreateTime().getTime() 
                  transactionConfigurator.getRecoverConfig().getMaxRetryCount()
                  *transactionConfigurator.getRecoverConfig().getRecoverDuration() 
                  * 1000
                  > System.currentTimeMillis())) {
                continue;
            }

            try {
                // 增加重试次数
                transaction.addRetriedCount();
                // 如果当前事务状态为CONFIRMING
                if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
                    // 设置事务状态为CONFIRMING
                    transaction.changeStatus(TransactionStatus.CONFIRMING);
                    // 修改当前事务状态
                    transactionConfigurator.getTransactionRepository().update(transaction);
                    // 提交事务
                    transaction.commit();
                    // 删除事务记录(删除失败则不作处理)
                    transactionConfigurator.getTransactionRepository().delete(transaction);

                // 如果事务状态为CANCELLING或者事务为根事务(根事务没提交)
                } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
                        || transaction.getTransactionType().equals(TransactionType.ROOT)) {

                    // 设置事务状态为CANCELLING
                    transaction.changeStatus(TransactionStatus.CANCELLING);
                    // 修改当前事务状态
                    transactionConfigurator.getTransactionRepository().update(transaction);
                    // 回滚事务
                    transaction.rollback();
                    // 删除事务记录(删除失败则不作处理)
                    transactionConfigurator.getTransactionRepository().delete(transaction);
                }

            } catch (Throwable throwable) {

                ...省略异常日志...
            }
        }
    }

    public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
        this.transactionConfigurator = transactionConfigurator;
    }
}

上述注释已经很明确的指出了事务补偿job的核心逻辑,就不再赘述。我们总结一下:

对于trying阶段的异常事务,不会进行重试;而是会触发canceling操作;

对于confirming、canceling阶段的异常事务,定时进行重试补偿,尽最大努力去尝试提交事务,如果达到了最大重试次数还是处理失败则不再处理。这种极端的情况需要人工进行介入。

TCC-Transaction提供的后台server模块允许我们对事务进行手工补偿操作,这极大的提高了框架的可靠性以及易用性。

小结

本文我们主要对TCC-Transaction的事务补偿阶段逻辑进行了分析。TCC-Transaction框架通过定时补偿,对异常事务进行处理,保证了分布式事务的最终一致性。

通过这一过程,我们能够看出,TCC-Transaction框架本质上也是柔性事务的一种解决方案,如果仔细分析,它也是满足BASE原则的。

版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

原文  http://wuwenliang.net/2019/09/09/TCC-Transaction源码解析之事务补偿/
正文到此结束
Loading...