原创

Spring Boot集成@Async快速入门Demo

1.什么是@Async?

当我们在使用SpringBoot进行开发的时候,可能会遇到一些执行异步任务的场景,如果每次执行这些异步任务都去新建一个异步线程来执行的话,那代码就太冗余了。幸好SpringBoot给我们提供了Async的注解,让我们能够很轻松地对这些异步任务进行执行。

失效条件

  1. 异步方法使用static修饰
  2. 调用方法和异步方法在同一个类中

2.代码工程

实验目标:验证@async异步任务

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>async</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

service

  • thenApply : 处理上一阶段计算结果
  • thenCompose: 整合两个计算结果
package com.et.async.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName NotifyServiceimpl
 * @Description todo
 */
@Service
@Slf4j
public class NotifyService {
    public void noAsync() {
       log.info("Execute method asynchronously. " + Thread.currentThread().getName());
    }
    @Async("threadPoolTaskExecutor")
    public void withAsync() {
        log.info("Execute method asynchronously. " + Thread.currentThread().getName());
    }
    @Async("threadPoolTaskExecutor")
    public void mockerror() {
        int ss=12/0;
    }
    @Async
    public Future<String> asyncMethodWithReturnType() {
        log.info("Execute method asynchronously - " + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
            return new AsyncResult<String>("hello world !!!!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    @Autowired
    private FirstAsyncService fisrtService;
    @Autowired
    private SecondAsyncService secondService;

    public CompletableFuture<String> asyncMergeServicesResponse() throws InterruptedException {
        CompletableFuture<String> fisrtServiceResponse = fisrtService.asyncGetData();
        CompletableFuture<String> secondServiceResponse = secondService.asyncGetData();

        // Merge responses from FirstAsyncService and SecondAsyncService
        return fisrtServiceResponse.thenCompose(fisrtServiceValue -> secondServiceResponse.thenApply(secondServiceValue -> fisrtServiceValue + secondServiceValue));
    }
}
package com.et.async.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName FirstAsyncService
 * @Description todo
 * @date 2024年05月10日 16:24
 */
@Service
@Slf4j
public class FirstAsyncService {
    @Async
    public CompletableFuture<String> asyncGetData() throws InterruptedException {
       log.info("Execute method asynchronously " + Thread.currentThread().getName());
        Thread.sleep(4000);
        return new AsyncResult<>(super.getClass().getSimpleName() + " response !!! ").completable();
    }
}
package com.et.async.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

/**
 * @author liuhaihua
 * @version 1.0
 * @ClassName SecondAsyncService
 * @Description todo
 * @date 2024年05月10日 16:24
 */
@Service
@Slf4j
public class SecondAsyncService {
    @Async
    public CompletableFuture<String> asyncGetData() throws InterruptedException {
        log.info("Execute method asynchronously " + Thread.currentThread()
                .getName());
        Thread.sleep(4000);
        return new AsyncResult<>(super.getClass().getSimpleName() + " response !!! ").completable();
    }
}

config

一个@EnableAsync 注解启用
package com.et.async.config;

import com.et.async.exception.CustomAsyncExceptionHandler;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {

    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        return new ThreadPoolTaskExecutor();
    }

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

异常类

对于无返回值的异步任务,配置CustomAsyncExceptionHandler类,统一处理无法捕获的异常
package com.et.async.exception;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;

import java.lang.reflect.Method;

public class CustomAsyncExceptionHandler
  implements AsyncUncaughtExceptionHandler {

    @Override
    public void handleUncaughtException(
            Throwable throwable, Method method, Object... obj) {
 
        System.out.println("Exception message - " + throwable.getMessage());
        System.out.println("Method name - " + method.getName());
        for (Object param : obj) {
            System.out.println("Parameter value - " + param);
        }
    }
    
}

3.测试

测试async异步任务

@Test
public void execute() throws ExecutionException, InterruptedException {
    log.info("your method test Code");
    log.info("Invoking an asynchronous method. " + Thread.currentThread().getName());
    notifyService.noAsync();
    notifyService.withAsync();

}

测试带线程池的异步任务

@Async("threadPoolTaskExecutor")
public void mockerror() {
    int ss=12/0;
}

测试带返回值的异步方法

@Test
public void testAsyncAnnotationForMethodsWithReturnType()
        throws InterruptedException, ExecutionException {
    log.info("Invoking an asynchronous method. " + Thread.currentThread().getName());
    Future<String> future = notifyService.asyncMethodWithReturnType();

    while (true) {
        if (future.isDone()) {
            log.info("Result from asynchronous process - " + future.get());
            break;
        }
        log.info("Continue doing something else. ");
        Thread.sleep(1000);
    }
}

测试多个异步任务合并结果

@Test
public void testAsyncAnnotationForMergedServicesResponse() throws InterruptedException, ExecutionException {
   log.info("Invoking an asynchronous method. " + Thread.currentThread().getName());
    CompletableFuture<String> completableFuture = notifyService.asyncMergeServicesResponse();

    while (true) {
        if (completableFuture.isDone()) {
           log.info("Result from asynchronous process - " + completableFuture.get());
            break;
        }
        log.info("Continue doing something else. ");
        Thread.sleep(1000);
    }
}

测试void方法异常捕获

@Test
public void mockerror() throws ExecutionException, InterruptedException {
    notifyService.mockerror();
}

4.引用参考

 
正文到此结束
Loading...