Java并发-ScheduledThreadPoolExecutor

A ThreadPoolExecutor that can additionally schedule commands to run after a given delay, or to execute periodically( 周期性地 ) . This class is preferable to( 优先于……Timer when multiple worker threads are needed, or when the additional flexibility( 灵活性 ) or capabilities of ThreadPoolExecutor (which this class extends) are required.

Delayed tasks execute no sooner than they are enabled, but without any real-time guarantees about when, after they are enabled, they will commence. Tasks scheduled for exactly the same execution time are enabled in first-in-first-out (FIFO) order of submission.

When a submitted task is cancelled before it is run, execution is suppressed. By default, such a cancelled task is not automatically removed from the work queue until its delay elapses. While this enables further inspection and monitoring, it may also cause unbounded retention of cancelled tasks. To avoid this, set setRemoveOnCancelPolicy to true, which causes tasks to be immediately removed from the work queue at time of cancellation .

Extension notes : This class overrides the execute and submit methods to generate internal ScheduledFuture objects to control per-task delays and scheduling. To preserve functionality, any further overrides of these methods in subclasses must invoke superclass versions, which effectively disables additional task customization. However, this class provides alternative protected extension method decorateTask (one version each for Runnable and Callable) that can be used to customize the concrete task types used to execute commands entered via execute , submit , schedule , scheduleAtFixedRate , and scheduleWithFixedDelay . By default, a ScheduledThreadPoolExecutor uses a task type extending FutureTask .

类层次结构

Java并发-ScheduledThreadPoolExecutor

构造方法

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                               ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                               RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}
复制代码

执行和调度任务方法

public ScheduledFuture<?> schedule(Runnable command,
                               long delay,TimeUnit unit)
  
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                               long delay,TimeUnit unit)

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                            long initialDelay,long period,TimeUnit unit)
                            
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                             long initialDelay,long delay,TimeUnit unit) 

public Future<?> submit(Runnable task)

public <T> Future<T> submit(Runnable task, T result)

public <T> Future<T> submit(Callable<T> task)

复制代码

其他方法

public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)

public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()

public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)

public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()

public void setRemoveOnCancelPolicy(boolean value)

public boolean getRemoveOnCancelPolicy()

public void shutdown()

public List<Runnable> shutdownNow()

public BlockingQueue<Runnable> getQueue()

复制代码

实现原理

ScheduledThreadPoolExecutor 调度和执行任务的过程可以抽象如下图所示:

Java并发-ScheduledThreadPoolExecutor
  1. CallableRunnable 对象转换 ScheduledFutureTask 对象;
  2. 将转换后的 ScheduledFutureTask 对象添加到延迟队列并开启线程执行任务;
  3. 工作线程从队列获取 ScheduledFutureTask 任务执行任务。

通过上述描述可以发现, ScheduledFutureTaskScheduledThreadPoolExecutor 实现的关键。

源码分析

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    // 步骤1    
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    // 步骤2                    
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

// 将任务添加到延迟队列
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}
复制代码

ScheduledFutureTask

类层次结构

Java并发-ScheduledThreadPoolExecutor

构造方法

ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
}
复制代码

任务执行

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic) // 非周期性任务,直接执行
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) { // 执行任务,重置状态
        // 计算下一次执行时间
        setNextRunTime();
        // 重新添加到延迟队列
        reExecutePeriodic(outerTask);
    }
}

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}
复制代码

原文 

https://juejin.im/post/5bfcec63f265da61682b1028

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Java并发-ScheduledThreadPoolExecutor

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址