转载

整整 Java 线程池

用官方文档来说,线程池解决了两个问题: 一是在执行大量的异步任务时,因为线程池减少了任务开始前的准备工作,如频繁创建线程,启动线程等工作,提升了性能表现;二是提供了一种绑定资源和管理资源的途径,可以进行一些基础的统计分析,比如已经完成的任务数量等。

ExecutorService

接口,继承自 Executor,通过 execute 方法执行 Runnable 任务。ExecutorService 提供了管理终止异步任务的方法和通过 Future 对象追踪异步任务进度的方法。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
复制代码

ExecutorService 可以被关闭,随后不再接受新的任务,资源可以被回收。通过 submit 方法扩展 execute 方法,返回一个 Future 对象用于取消执行或者等待执行完成。可以批量执行任务。

public interface ExecutorService extends Executor {

    /**
     * 已经提交的任务继续执行,不再接受新的任务
     */
    void shutdown();

    /**
     * 尝试停止正在执行的任务,没有执行的任务不再执行,不再接受新的任务
     * 返回没有执行的任务的列表
     */
    List<Runnable> shutdownNow();

    /**
     * 阻塞直到(所有任务完成(shutdown 后)| 超时 | 当前线程被中断)发生其一
     * 终止时返回 true,反之返回 false
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交新任务,并返回一个 Future 对象用于获取执行结果信息
     */
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    /**
     * 执行给定的任务列表,返回任务结果信息的 Future 列表。可设定超时时间
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 执行给定的任务列表,返回一个成功完成的任务的结果(如果有)。可设定超时时间
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

ThreadPoolExecutor

继承自 AbstractExecutorService。AbstractExecutorService 实现了 ExecutorService 接口,并添加 protect 方法 newTaskFor,用于辅助实现 submit 方法返回 Future 对象。在使用线程池的过程中,大多数情况下使用该类的构造方法即可,除非是延迟任务或者 ForkJoin 类型的任务。

构造方法

线程池的构造方法有 4 个,最终都会调用到下方的这个构造方法。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
复制代码
  • corePoolSize: 线程池保持的线程数目,即使线程处于空闲状态。allowCoreThreadTimeOut = true 时空闲线程无任务超时时会被销毁.

  • maximumPoolSize: 任务队列饱和之后允许创建新的线程,表示线程池中最大允许的线程数量。

  • keepAliveTime: 线程池中线程的数目大于 corePoolSize 时,空闲的线程在终止之前保活的时间。

  • unit: 时间单位。

  • workQueue: 任务队列。只保存通过 ExecutorService 的 execute() 方法提交的任务。当加入的任务超过队列的最大容积,则需要使用 RejectedExecutionHandler 来进行处理。 BlockingQueue 有一些默认的实现,比较常用的是 LinkedBlockingQueue,SynchronousQueue,ArrayBlockingQueue;分别对应链表实现的无界限队列,直接创建线程的同步队列,特定大小的数组式队列(打算后面写一个关于这个的吧)。

  • threadFactory: 生成新的线程的工厂。可以自定义,比如命名线程等操作可以其中实现。在 Executors 中实现了一些工厂,比如 DefaultThreadFactory,可以基于该类进行修改,使得其符合自定义的要求。需要实现 newThread 方法,返回新建的线程。

  • handler: 当线程池线程数量达到最大并且队列已经填满时,再进来的任务就不能处理了。这时候需要一个 RejectedExecutionHandler 来说明如何对这些溢出的任务进行处理。ThreadPoolExecutor 中默认实现了 CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy 这几种类型的处理器,分别处理为直接执行,中断,忽视该任务,忽视最老未执行任务。我们可以自定义处理器如新建新的线程进行处理等。

线程的创建规则

当一个新的任务被提交到线程池中时,会有以下几种可能的情况发生。

  1. 线程池中有小于 corePoolSize 的线程正在运行,此时会重新创建一个新的线程,即使线程池中的线程有的是空闲状态,因为此时还没有达到线程池的核心线程数。
  2. 线程池线程数量大于等于 corePoolSize 且小于 maximumPoolSize 时,如果工作队列没有饱和,将新任务添加到任务队列中,不会重新创建线程;如果工作队列已经饱和了,创建新的线程。
  3. 线程池线程数量等于 maximumPoolSize 并且任务队列饱和时,使用线程池的 RejectedExecutionHandler 对新的任务进行处理,如中断、忽视、直接在本线程执行等。

ScheduledExecutorService 与 ForkJoinPool

一般情况下使用 ThreadPoolExecutor 已经足够了,另外还有可以管理延迟任务和周期任务的 ScheduledExecutorService,以及基于 ForkJoinPool 思想实现的 ForkJoinPool,在这里就简单提一提。

ScheduledExecutorService

继承自 ExecutorService,可以在一定时间后执行或者周期执行。扩展了几个延迟或者周期执行任务的方法。通过 execute 方法和 submit 方法提交的任务以及 ScheduledExecutorService 新扩展的几个延迟和周期方法中时间设定为非正数时,认为这个任务应该被立即执行。

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * 创建延迟任务,在 delay 延迟之后执行,返回一个 ScheduledFuture 表示执行的状态。
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * 创建周期执行的任务,初始延迟之后第一次进行执行,周期时间后重复执行。如果执行的时间超过周期时间也不会同时执行,会有一定的延迟再。返回一个 ScheduledFuture 表示执行的状态。
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * 创建周期执行的任务,在初始延迟之后第一次执行,在完成之后的 delay 时间后再次执行。返回一个 ScheduledFuture 表示执行的状态。
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

复制代码

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor 并实现了 ScheduledExecutorService 中新添加的延迟以及周期方法。构造方法继承父类,只是将任务列表改成了 DelayedWorkQueue 对象,用于实现延迟任务。

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
复制代码

从构造方法中可以看到调用了父类 ThreadPoolExecutor 的一个构造方法,但是 maximumPoolSize 设定为 Integer.MAX_VALUE,keepAliveTime 设定为 0。这样的情况下 corePoolSize 就不应该设定为 0 以及 allowCoreThreadTimeOut 不应该设定为 true,否则当有新的任务下达时线程池中可能没有可用的线程可以使用,线程池也就没有使用的意义了。

Delayed tasks execute no sooner than they are enabled, but without any real-time guarantees about when, after they are enabled, they will commence.

延迟任务在可用之前不会执行,但是并没有实时的监控者,所以当其可用时才会执行(会存在一些误差)。预定在同一时间执行的任务会按照任务提交的顺序执行,FIFO 原则。如果任务在没有执行之前就被撤销了,如果不设定 setRemoveOnCancelPolicy 为 true,则直到初始延迟结束之后才有从队列中移除,设定之后就会在取消时直接移除。

scheduleAtFixedRate 与 scheduleAtFixedRate 计划的连续执行的任务在时间上不会重叠,即使运行在不同的线程,下一次任务开始之前获取的状态一定是前一次执行的结果,线程安全。

ScheduledThreadPoolExecutor 重写了 ThreadPoolExecutor 的一些方法,以适应延迟任务的实现,在此也不过多描述了。

ForkJoinPool

继承自 AbstractExecutorService,用于运行 ForkJoinTask 任务的线程池,提供了提交、管理和监控操作的方法。与一般的 ExecutorService 的区别主要在于“工作窃取(work-stealing)”的思想:线程池中的所有线程都会去寻找和执行被提交到线程池中的任务或者由线程任务执行时产生的任务。在大多数任务可以产生子任务以及执行多个子任务的这两个情况下使用 ForkJoinPool 是一个很高效的选择。在构造方法中设定 asyncMode 时,将对没有提交的任务使用 FIFO 模式进行管理,默认是基于堆模式实现的。在此处不过多介绍,后续有时间会专门写一写这个,对于 ForkJoin 任务可以看一下方腾飞大佬的 聊聊并发(八)——Fork/Join 框架介绍 。

快捷线程池

Java 已经封装了一些线程池供我们使用,Executors 类下的形如 newXxxTreadPool 方法,可以快捷的建立一些常用的线程池,以下是三种常用到的线程池。

  • newFixedThreadPool: 指定线程数,创建一个 corePoolSize 和 maximumPoolSize 一致,无边界任务队列的线程池。
  • newSingleThreadExecutor: 创建一个单一工作线程的,无边界任务队列的线程池。 corePoolSize = maximumPoolSize = 1。
  • newCachedThreadPool: 创建一个可重用之前线程的线程池。如果之前创建的线程空闲而且没有被销毁则使用其执行新的任务。使用的是同步队列 SynchronousQueue。 corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE, keepAliveTime = 60s。也就是说之前创建的线程如果在 60s 内有新任务,则可以进行复用。
  • newWorkStealingPool:用于产生工作窃取的 ForkJoinTask 的线程池。
  • newScheduledThreadPool:用于处理具有延迟,周期性工作的任务的线程池。

总结

使用线程池的时候,一般讲 ThreadPoolExecutor 弄清楚就足够了,对于 ScheduledThreadPoolExecutor 和 ForkJoinPool 来说使用的情况比较少,在特定的情况下才会用到,我们需要了解一下有这么个东西,需要用的时候想想能不能排上用场即可。 Doug Lea 大佬把这些线程池的一些默认实现都写到了 Executors 类中,简单使用的时候可以直接使用这个类中方法。

好久不写东西,写东西的节奏都没有了,长度也控制不足,下次注意。

原文  https://juejin.im/post/5cf7687de51d4550bf1ae817
正文到此结束
Loading...