转载

浅析Java高并发下ScheduleThreadPoolExecutor延时任务

Java中的计划任务Timer工具类提供了以计时器或计划任务的功能来实现按指定时间或时间间隔执行任务,但由于Timer工具类并不是以池pool方式实现的,而是以队列的方式来管理线程的,所以在高并发的情况下运行效率较低,在JDK 1.5版本以后提供了ScheduledExecutorService对象来解决效率与定时任务的性能问题。

这篇文章我们主要讨论ScheduledExecutorService的使用技巧以及一些常用的线程池操作方法,后面的文章会继续对执行器进行深入的交流探讨。

浅析Java高并发下ScheduleThreadPoolExecutor延时任务

Executors 工具类提供了两个常用的ScheduledThreadPoolExecutor

这两个常用的ScheduledThreadPoolExecutor:SingleThreadScheduledExecutor(单线程的线程池)、ScheduledThreadPool(线程数量固定的线程池),下面是 Executors 对应的源代码。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory arg) {
    return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, arg));
}

public static ScheduledExecutorService newScheduledThreadPool(int arg) {
    return new ScheduledThreadPoolExecutor(arg);
}

public static ScheduledExecutorService newScheduledThreadPool(int arg, ThreadFactory arg0) {
    return new ScheduledThreadPoolExecutor(arg, arg0);
}
复制代码

ScheduledExecutorService是一个接口,继承于ExecutorService,支持线程池的所有功能,同时也提供了四个用于计划任务调度的核心方法。

下面我将介绍这四个方法的使用和一些常用的线程池方法:

1、schedule runnable

带延迟时间的调度,只执行一次,返回值为实现Future接口的对象,可调用Future.get()方法阻塞直到任务执行完毕

/**
 * 创建并执行在给定延迟后启用的一次性操作
 *
 * @param command 要执行的任务 
 * @param delay 从现在开始延迟执行的时间 
 * @param unit 延时参数的时间单位 
 * @return 表示任务等待完成,并且其的ScheduledFuture get()方法将返回 null 
 * @throws RejectedExecutionException 如果任务无法安排执行 
 * @throws NullPointerException 如果命令为空 
 */
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
复制代码

schedule runnable使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
    ScheduledFuture<?> future = scheduled.schedule(() -> {
        try {
            System.out.println("开始执行任务");
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("执行完毕");
    }, 1000, TimeUnit.MILLISECONDS);
    System.out.println("阻塞开始");
    System.out.println(future.get() + "");
    System.out.println("阻塞结束");
}
复制代码

执行结果如下:

阻塞开始 开始执行任务 执行完毕 null 阻塞结束

schedule runnable,这个方法是不提供返回值的,所以调用future.get()方法返回的是null

2、schedule callable

带延迟时间的调度,只执行一次,返回值为实现Future接口的对象,调用Future.get()方法阻塞直到任务完成,可以获取到返回结果

/**
 * 创建并执行在给定延迟后启用的ScheduledFuture
 *
 * @param callable 执行的功能 
 * @param delay 从现在开始延迟执行的时间 
 * @param unit 延迟参数的时间单位 
 * @param <V> the 可调用结果的类型 
 * @return一个可用于提取结果或取消的ScheduledFuture 
 * @throws RejectedExecutionException 如果该任务无法安排执行 
 * @throws NullPointerException 如果callable为空 
 */
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
复制代码

schedule Callable 使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newSingleThreadScheduledExecutor();
    ScheduledFuture<String> future = scheduled.schedule(() -> {
        try {
            System.out.println("开始执行任务");
            TimeUnit.SECONDS.sleep(3);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("执行完毕");
        return "success";
    }, 1000, TimeUnit.MILLISECONDS);
    System.out.println("阻塞开始");
    System.out.println(future.get() + "");
    System.out.println("阻塞结束");
}
复制代码

执行结果:

阻塞开始 开始执行任务 执行完毕 success 阻塞结束

schedule callable 是带返回值的,通过future.get()获取

3、scheduleAtFixedRate

创建并执行一个在给定初始延迟后的定期操作,也就是将在 initialDelay 后开始执行,然后在initialDelay+period 后下一个任务执行,接着在 initialDelay + 2 * period 后执行,依此类推 ,也就是只在第一次任务执行时有延时。

/**
 * @param command 要执行的任务 
 * @param initialDelay 首次执行的延迟时间
 * @param period 连续执行之间的周期
 * @param unit initialDelay和period参数的时间单位 
 * @return 一个ScheduledFuture代表待完成的任务,其 get()方法将在取消时抛出异常 
 * @throws RejectedExecutionException 如果任务无法安排执行 
 * @throws NullPointerException 如果命令为空 
 * @throws IllegalArgumentException 如果period小于或等于零 
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
复制代码

scheduleAtFixedRate使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
    ScheduledFuture<?> future = scheduled.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("开始执行任务");
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("执行完毕");
        }
    }, 1000L, 1000L, TimeUnit.MILLISECONDS);
    System.out.println("阻塞开始");
    System.out.println(future.get() + "");
    System.out.println("阻塞结束");    
}
复制代码

打印结果如下:

阻塞开始 开始执行任务 执行完毕 开始执行任务 执行完毕 开始执行任务 执行完毕 ....

4、scheduleWithFixedDelay

创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟,即总时间是(initialDelay + period)* n

/**
 * @param command 要执行的任务 
 * @param initialDelay 首次执行的延迟时间
 * @param delay 一次执行终止和下一次执行开始之间的延迟
 * @param unit initialDelay和delay参数的时间单位
 * @return 表示挂起任务完成的ScheduledFuture,并且其get()方法在取消后将抛出异常
 * @throws RejectedExecutionException 如果任务不能安排执行 
 * @throws NullPointerException 如果command为null
 * @throws IllegalArgumentException 如果delay小于等于0
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
复制代码

scheduledWithFixedDelay使用示例

public static void main(String[] args) throws ExecutionException, InterruptedException {
    ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(5);
    ScheduledFuture<?> future = scheduled.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                System.out.println("开始执行任务");
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("执行完毕");
        }
    }, 1000L, 1000L, TimeUnit.MILLISECONDS);
    System.out.println("阻塞开始");
    System.out.println(future.get() + "");
    System.out.println("阻塞结束");
}
复制代码

打印结果如下:

阻塞开始 开始执行任务 执行完毕 开始执行任务 执行完毕 开始执行任务 执行完毕 ....

scheduleAtFixedRate和scheduleWithFixedDelay的区别在于,scheduleAtFixedRate()为固定频率,scheduleWithFixedDelay()为固定延迟。固定频率是相对于任务执行的开始时间,而固定延迟是相对于任务执行的结束时间,这就是他们最根本的区别!

5、线程池关闭,shutdown()和shutdownNow()的使用

两个关闭线程池的方法,一旦线程池被关闭,就会拒绝以后提交的所有任务

使用shutdown()可以使用awaitTermination等待所有线程执行完毕当前任务。在shutdown以前已提交任务的执行中发起一个有序的关闭,但是不接受新任务。

使用shutdownNow()尝试停止所有正在执行的任务、暂停等待任务的处理,并返回等待执行的任务列表。对于正在运行,尝试通过中断该线程来结束线程。对于尚未运行的任务,则都不再执行。

class PrintThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "PrintThreadFactory");
    }
}
复制代码
public static void main(String[] args) {
    final AtomicInteger count = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());
    Runnable runnable = () -> {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
            System.out.println("任务继续...");
            try {
                Thread.sleep(3000L);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("任务结束");
        }
    };
    schedule.scheduleAtFixedRate(runnable, 0L, 2L, TimeUnit.SECONDS);
    try {
        countDownLatch.await();
        schedule.shutdown();  //平滑停止线程,不处理新任务,完成正在执行的任务
//      schedule.shutdownNow();  // 尝试强制停止线程,让终止的线程去设置休眠会抛出异常

        if (schedule.isShutdown()) {
            System.out.println("Scheduled is shutdown");
        }
        if (schedule.awaitTermination(10L, TimeUnit.SECONDS)) {
            System.out.println("termination");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
复制代码

通过Future.cancel()取消运行任务

cancel()方法接收参数是布尔型的,传入true会中断线程停止任务,传入false则会让线程正常执行至完成。这里传入false既然不会中断线程,那么这个cancel方法不就没有意义了?

class PrintThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "PrintThreadFactory");
    }
}
复制代码
public static void main(String[] args) {
    final AtomicInteger count = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());
    Runnable runnable = () -> {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
        }
    };
    Future future = schedule.scheduleAtFixedRate(runnable, 0L, 2L, TimeUnit.SECONDS);
    try {
        countDownLatch.await();
        future.cancel(true);
        if (future.isCancelled()) {
            System.out.println("is Cancelled");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
复制代码

简单来说传入false参数只能取消还没开始的任务,若任务已经开始了,就由其运行下去。所以对于已经开始的任务,如果想要停止的话,需要给cancel方法的参数设置为true。

6、ScheduledThreadPoolExecutor参数使用

浅析Java高并发下ScheduleThreadPoolExecutor延时任务

continueExistingPeriodicTasksAfterShutdown,对于通过scheduleAtFixedRate、scheduleWithFixedDelay 提交的周期任务有效 默认值为false,设置为true表示当执行器调用shutdown后,继续执行延时任务;

与之对应的get和set方法

void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
复制代码

executeExistingDelayedTasksAfterShutdown,对于通过schedule()方法提交的延时任务有效,默认为true,设置为false表示当执行器调用shutdown后,不再继续执行现有延迟任务;

与之对应的get和set方法

void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
复制代码

removeOnCancel,默认为false,设置为true则从队列中删除执行任务;

与之对应的get和set方法

void setRemoveOnCancelPolicy(boolean value)
boolean getRemoveOnCancelPolicy()
复制代码

使用示例

public static void main(String[] args) {

        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

        Runnable runnable = () -> System.out.println(Thread.currentThread().getName() + "_1");

//        ScheduledFuture<?> future = executor.schedule(runnable, 3, TimeUnit.SECONDS);
        // 对于通过schedule()方法提交的延时任务有效,默认为true,设置为false表示当执行器调用shutdown后,不再继续执行现有延迟任务
//        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);

//        System.out.println(executor.getQueue().size());
//        // 默认为false,设置为true则从队列中删除执行任务
//        executor.setRemoveOnCancelPolicy(false);
//        future.cancel(true);
//        System.out.println(executor.getQueue().size());

        executor.scheduleAtFixedRate(runnable, 1L, 1L, TimeUnit.SECONDS);
//        //对于通过scheduleAtFixedRate、scheduleWithFixedDelay 提交的周期任务有效 默认值为false,设置为true表示当执行器调用shutdown后,继续执行延时任务
        executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);

        executor.shutdown();

        System.out.println("线程池停止");
    }
复制代码

7、其他

零延时的 execute()、submit() 方法

execute()、submit() 方法都被重写了,本质上调用的还是 schedule() 方法;从下面的源码可以看出,这两个方法提交的任务都是延时为0的 “实时任务”;

public void execute(Runnable arg0) {
    this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
}
public Future<?> submit(Runnable arg0) {
    return this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
}
复制代码

封装计划任务线程池工具类

下面是使用单例模式封装的工具类

public final class MyScheduledExecutor {

    // 全局用于处理接收Future对象的集合
    private ConcurrentHashMap<String, Future> futureMap = new ConcurrentHashMap<>();

    // 计划执行任务
    private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5);

    private MyScheduledExecutor() {
    }

    // 设计为单例模式
    private static final class InnerExecutorService {
        private static final MyScheduledExecutor INSTANCE = new MyScheduledExecutor();
    }
    public static MyScheduledExecutor getInstance() {
        return InnerExecutorService.INSTANCE;
    }

    public ConcurrentHashMap<String, Future> getFutureMap() {
        return futureMap;
    }

    public void shutdown() {
        executorService.shutdown();
    }

    /**
     * 执行任务
     * @param runnable {@code Runnable}
     */
    public void execute(Runnable runnable) {
        executorService.execute(runnable);
    }

    /**
     * 执行延时任务
     *
     * @param runnable {@code Runnable}
     * @param delay    延迟时间
     * @param timeUnit 时间单位
     */
    public void scheduler(Runnable runnable, long delay, TimeUnit timeUnit) {
        executorService.schedule(runnable, delay, timeUnit);
    }

    /**
     * 执行延时周期性任务scheduleAtFixedRate
     *
     * @param runnable     {@code ScheduledExecutorService.JobRunnable}
     * @param initialDelay 延迟时间
     * @param period       周期时间
     * @param timeUnit     时间单位
     * @param <T>          {@code ScheduledExecutorService.JobRunnable}
     */
    public <T extends JobRunnable> void scheduleAtFixedRate(T runnable, long initialDelay, long period, TimeUnit timeUnit) {
        Future future = executorService.scheduleAtFixedRate(runnable, initialDelay, period, timeUnit);
        futureMap.put(runnable.getJobId(), future);
    }

    /**
     * 执行延时周期性任务scheduleWithFixedDelay
     *
     * @param runnable     {@code ScheduledExecutorService.JobRunnable}
     * @param initialDelay 延迟时间
     * @param period       周期时间
     * @param timeUnit     时间单位
     * @param <T>          {@code ScheduledExecutorService.JobRunnable}
     */
    public <T extends JobRunnable> void scheduleWithFixedDelay(T runnable, long initialDelay, long period, TimeUnit timeUnit) {
        Future future = executorService.scheduleWithFixedDelay(runnable, initialDelay, period, timeUnit);
        futureMap.put(runnable.getJobId(), future);
    }

    public static abstract class JobRunnable implements Runnable {
        private String jobId;

        public JobRunnable(String jobId) {
            this.jobId = jobId;
        }

        public void terminal() {
            try {
                Future future = MyScheduledExecutor.getInstance().getFutureMap().remove(jobId);
                future.cancel(true);
            } finally {
                System.out.println("jobId " + jobId + " had cancel");
            }
        }

        public String getJobId() {
            return jobId;
        }
    }
}
复制代码

调用示例

public static void main(String[] args) throws Exception {

    MyScheduledExecutor service = MyScheduledExecutor.getInstance();
    service.execute(() -> System.out.println("execute"));
//    service.scheduler(new Runnable() {
//        @Override
//        public void run() {
//            for (Map.Entry<String, Future> next : service.getFutureMap().entrySet()) {
//                String key = next.getKey();
//                int i = Integer.parseInt(key.substring(3));
//                // 停止部分线程
//                if (i % 2 == 0) {
//                    next.getValue().cancel(true);
//                }
//            }
//        }
//    }, 20, TimeUnit.SECONDS);

    for (int i = 0; i < 5; i++) {
        int num = new Random().nextInt(500);
        service.scheduleAtFixedRate(new MyScheduledExecutor.JobRunnable("scheduleAtFixedRate" + num) {
            @Override
            public void run() {
                System.out.println(num);
            }
        }, 10, 2, TimeUnit.SECONDS);
    }
    Thread.sleep(15000);
    for (Map.Entry<String, Future> next : service.getFutureMap().entrySet()) {
        String key = next.getKey();
        int i = Integer.parseInt(key.substring(3));
        // 停止部分线程
        if (i % 2 == 0) {
            next.getValue().cancel(true);
        }
    }
    Thread.sleep(20000);
    service.shutdown();
}
复制代码

总结

需要注意,通过ScheduledExecutorService执行的周期任务,如果任务执行过程中抛出了异常,那么ScheduledExecutorService就会停止执行任务,且也不会再周期地执行该任务了。所以如果想保住任务都一直被周期执行,那么catch一切可能的异常。

原文  https://juejin.im/post/5e68ad5e6fb9a07c86790f35
正文到此结束
Loading...