转载

JDK 源码分析:ScheduledThreadPoolExecutor

概述

ScheduledThreadPoolExecutor 也是一个线程池类,是线程池类  ThreadPoolExecutor 的子类。 除了 ThreadPoolExecutor 相关的方法之外,它还增加了执行定时任务和周期性任务的方法。它的类 签名和继承结构如下:

<span><span>public</span> <span><span>class</span> <span>ScheduledThreadPoolExecutor</span></span></span>

<span> <span>extends</span> <span>ThreadPoolExecutor</span></span>

<span><span> <span>implements</span> <span>ScheduledExecutorService</span> </span>{}</span>

JDK 源码分析:ScheduledThreadPoolExecutor

可以看到,它继承了 ThreadPoolExecutor 类(参考 「 JDK源码分析-ThreadPoolExecutor 」),并且实现了 ScheduledExecutorService 接口(参考 「 JDK源码分析-ScheduledExecutorService 」),因此具有二者的特性。 下面分析其代码实现。

代码分析

内部嵌套类 DelayedWorkQueue

先看它的一个内部嵌套类 DelayedWorkQueue,该类是一个延迟队列,它的类签名和继承结构如下:

static class DelayedWorkQueue extends AbstractQueue<Runnable>

implements BlockingQueue<Runnable> {}

JDK 源码分析:ScheduledThreadPoolExecutor

Delay edWorkQue ue 类与前文分析的 DelayQueue 「 JDK源码分析-DelayQueue 」实现原理类似 ,这里就不再赘述。

构造器

ScheduledThreadPoolExecutor 有如下四个构造器:

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}


public ScheduledThreadPoolExecutor(int corePoolSize,

ThreadFactory threadFactory) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue(), threadFactory);

}


public ScheduledThreadPoolExecutor(int corePoolSize,

RejectedExecutionHandler handler) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue(), handler);

}


public ScheduledThreadPoolExecutor(int corePoolSize,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue(), threadFactory, handler);

}

这几个构造器都是直接调用父类 ThreadPoolExecutor 的构造器,只是传入了不同的参数。而其中的参数 workQueue 都传入了上面的延迟队列 DelayedWorkQueue。

内部类 ScheduledFutureTask

ScheduledThreadPoolExecutor 还有一个内部类 ScheduledFutureTask,它的继承结构如下:

JDK 源码分析:ScheduledThreadPoolExecutor

它继承了 FutureTask 类(可参考前文「 JDK源码分析-FutureTask 」的分析),且实现了 RunnableScheduledFuture 接口,该接口定义如下:

public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {

// 一个任务是否周期性执行的,若是则可以重复执行;否则只能运行一次

boolean isPeriodic();

}

RunnableScheduledFuture 只定义了一个方法 isPeriodic,该方法用于判断一个任务是否是周期性执行的。它继承的  Runna bleF utur e 接口在前文 FutureTask 类中已进行分析,而 ScheduledFuture 接口如 下:

public interface ScheduledFuture<V> extends Delayed, Future<V> {

}

它的内部并未定义方法,只是整合了 Delayed 接口和 Future 接口,Delayed 接口前文也已分析,下面分析该类的主要代码。

先看它的主要成员变量:

// 定时任务执行的时间(单位:纳秒)

private long time;


/**

* 重复执行的任务的时间间隔(单位:纳秒)

* 正数表示固定频率(fixed-rate)执行

* 负数表示固定延迟(fixed-delay)执行

* 零表示非重复执行的任务

*/

private final long period;


// reExecutePeriodic 方法中重新排队的任务

RunnableScheduledFuture<V> outerTask = this;


// 延迟队列中的索引位置,便于快速取消

int heapIndex;

构造器:

/**

* 构造器一:用给定的触发时间(纳秒),创建一个一次性任务

*/

ScheduledFutureTask(Runnable r, V result, long ns) {

super(r, result);

this.time = ns;

this.period = 0;

this.sequenceNumber = sequencer.getAndIncrement();

}


/**

* 构造器二:用给定的触发时间和间隔(纳秒),创建一个周期性任务

*/

ScheduledFutureTask(Runnable r, V result, long ns, long period) {

super(r, result);

this.time = ns;

this.period = period;

this.sequenceNumber = sequencer.getAndIncrement();

}


/**

* 构造器三:用给定的触发时间(纳秒),创建一个一次性任务

*/

ScheduledFutureTask(Callable<V> callable, long ns) {

super(callable);

this.time = ns;

this.period = 0;

this.sequenceNumber = sequencer.getAndIncrement();

}

ScheduledFutureTask 有三个构造器,可分为两类:分别是创建一次性任务(一和三)和周期性任务(二)。 其中一和 三还是 Runnable 和 Callable 的区别。

该类是一个任务类,即 Runnable 接口的实现类,因此它最核心的就是 run 方法,如下:

public void run() {

// 是否为周期性任务

boolean periodic = isPeriodic();

// 若任务不能执行,则取消

if (!canRunInCurrentRunState(periodic))

cancel(false);

// 若为非周期性任务

else if (!periodic)

// 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 run 方法执行

ScheduledFutureTask.super.run();

// 若为周期性任务,调用 ScheduledFutureTask 的父类(即 FutureTask)的 runAndReset 方法执行

else if (ScheduledFutureTask.super.runAndReset()) {

setNextRunTime(); // 设置下一次执行时间

reExecutePeriodic(outerTask); // 周期性执行

}

}

reExecutePeriodic 方法如下:

/**

* 该方法主要是将周期性任务重新排队

* 它的实现与 delayedExecute 方法(后面分析)逻辑有些类似

*/

void reExecutePeriodic(RunnableScheduledFuture<?> task) {

if (canRunInCurrentRunState(true)) {

super.getQueue().add(task);

if (!canRunInCurrentRunState(true) && remove(task))

task.cancel(false);

else

ensurePrestart();

}

}

schedule & scheduleAtFixedRate & scheduleWithFixedDelay

这几个就是执行定时任务和周期性任务的方法,它们是对前文 「 JDK源码分析-ScheduledExecutorService 」接口所定义的方法实现,可参考前文的分析。

schedule 方法 1:其作用是延迟指定的时间后执行任务(即执行定时任务),只会执行一次。

public ScheduledFuture<?> schedule(Runnable command,

long delay,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

// 把用户提交的 Runnable 对象包装为 RunnableScheduledFuture 对象

// decorateTask 方法默认返回第二个参数

// decorateTask 方法的修饰符是 protected,可根据需求自行扩展

RunnableScheduledFuture<?> t = decorateTask(command,

new ScheduledFutureTask<Void>(command, null,

triggerTime(delay, unit)));

// 执行给定的任务

delayedExecute(t);

return t;

}

delayExecute 方法:

/*

* 延迟或周期性任务的主要执行方法。

* 若线程池已关闭,则拒绝该任务(执行拒绝策略);

* 否则将任务添加到工作队列,若有需要启动一个线程去执行。

* 若在添加任务时关闭了线程池,则将其从队列移除并取消该任务

*/

private void delayedExecute(RunnableScheduledFuture<?> task) {

// 若线程池已关闭,则执行拒绝策略

if (isShutdown())

reject(task);

else {

// 将该任务添加到任务队列(即前面的延迟队列)

super.getQueue().add(task);

// 若当前任务无法执行,则将其从队列移除并且取消执行(类似事务的回滚操作)

if (isShutdown() &&

!canRunInCurrentRunState(task.isPeriodic()) &&

remove(task))

task.cancel(false);

// 任务可以执行,若有需要新增线程以执行该任务

else

ensurePrestart();

}

}

schedule 方法 2:

public <V> ScheduledFuture<V> schedule(Callable<V> callable,

long delay,

TimeUnit unit) {

if (callable == null || unit == null)

throw new NullPointerException();

RunnableScheduledFuture<V> t = decorateTask(callable,

new ScheduledFutureTask<V>(callable,

triggerTime(delay, unit)));

delayedExecute(t);

return t;

}

该方法与前者类似,差别在于这里的参数类型是 Callable,前者是 Runnable 类型,其他操作一样。

scheduleAtFixedRate 方法:

<span><span>public</span> ScheduledFuture&lt;?&gt; scheduleAtFixedRate(Runnable command,</span>

<span> <span>long</span> initialDelay,</span>

<span> <span>long</span> period,</span>

<span> TimeUnit unit) {</span>

<span> <span>if</span> (command == <span>null</span> || unit == <span>null</span>)</span>

<span> <span>throw</span> <span>new</span> NullPointerException();</span>

<span> <span>if</span> (period &lt;= <span>0</span>)</span>

<span> <span>throw</span> <span>new</span> IllegalArgumentException();</span>

<span> <span>// 将 Runnable 对象包装为 ScheduledFutureTask 对象</span></span>

<span> ScheduledFutureTask&lt;Void&gt; sft =</span>

<span> <span>new</span> ScheduledFutureTask&lt;Void&gt;(command,</span>

<span> <span>null</span>,</span>

<span> triggerTime(initialDelay, unit),</span>

<span> unit.toNanos(period));</span>

<span> RunnableScheduledFuture&lt;Void&gt; t = decorateTask(command, sft);</span>

<span> sft.outerTask = t;</span>

<span> delayedExecute(t);</span>

<span> <span>return</span> t;</span>

<span>}</span>

该方法与前面的 schedule 方法类似,区别仅在于使用了不同的 ScheduledFutureTask 对象,其他的执行流程几乎一样。

scheduleWithFix edDelay 方法:

<span><span>public</span> ScheduledFuture&lt;?&gt; scheduleWithFixedDelay(Runnable command,</span>

<span> <span>long</span> initialDelay,</span>

<span> <span>long</span> delay,</span>

<span> TimeUnit unit) {</span>

<span> <span>if</span> (command == <span>null</span> || unit == <span>null</span>)</span>

<span> <span>throw</span> <span>new</span> NullPointerException();</span>

<span> <span>if</span> (delay &lt;= <span>0</span>)</span>

<span> <span>throw</span> <span>new</span> IllegalArgumentException();</span>

<span> ScheduledFutureTask&lt;Void&gt; sft =</span>

<span> <span>new</span> ScheduledFutureTask&lt;Void&gt;(command,</span>

<span> <span>null</span>,</span>

<span> triggerTime(initialDelay, unit),</span>

<span> unit.toNanos(-delay));</span>

<span> RunnableScheduledFuture&lt;Void&gt; t = decorateTask(command, sft);</span>

<span> sft.outerTask = t;</span>

<span> delayedExecute(t);</span>

<span> <span>return</span> t;</span>

<span>}</span>

该方法与 scheduleAtFixedRate 方法基本一样,区别仅在于构建 ScheduledFutureTask 对象时参数 period 不同(一正一负,用以区分类型)。

execute & submit 方法

这两个方法是 Executor 接口和 ExecutorService 接口所定义的方法,代码实现如下:

<span><span><span>public</span> <span>void</span> <span>execute</span>(<span>Runnable command</span>)</span> {</span>

<span> schedule(command, <span>0</span>, NANOSECONDS);</span>

<span>}</span>

<span><br /></span>

<span><span>public</span> Future&lt;?&gt; submit(Runnable task) {</span>

<span> <span>return</span> schedule(task, <span>0</span>, NANOSECONDS);</span>

<span>}</span>

它们内部直接调用了 schedule(Runnable) 方法。 另外两个 submit 方法

<span><span>public</span> &lt;T&gt; <span>Future&lt;T&gt; <span>submit</span>(<span>Runnable task, T result</span>)</span> {</span>

<span> <span>return</span> schedule(Executors.callable(task, result), <span>0</span>, NANOSECONDS);</span>

<span>}</span>

<span><br /></span>

<span><span>public</span> &lt;T&gt; <span>Future&lt;T&gt; <span>submit</span>(<span>Callable&lt;T&gt; task</span>)</span> {</span>

<span> <span>return</span> schedule(task, <span>0</span>, NANOSECONDS);</span>

<span>}</span>

它们 部直接调用了  schedule(Callable)  方法。

小结

1. ScheduledThreadPoolExecutor 是线程池的实现类之一;

2. 它继承自 ThreadPoolExecutor,并实现了 ScheduledExecutorService 接口;

3. 提供了异步提交任务的 execute 方法和 submit 方法;

4. 供了执行定时任务的 schedule 方法和周期性任务的 scheduleAtFixedRate/scheduleWithFixedDelay 方法(使用延迟队列实现)。

相关阅读:

JDK源码分析-ThreadPoolExecutor

JDK源码分析-ScheduledExecutorService

JDK源码分析-DelayQueue

JDK源码分析-FutureTask

JDK 源码分析:ScheduledThreadPoolExecutor

原文  https://mp.weixin.qq.com/s/n7t8zrITOSw9oydvHg4Chw
正文到此结束
Loading...