Java线程池ThreadPoolExecutor实现原理剖析

【编者的话】在Java中,使用线程池来异步执行一些耗时任务是非常常见的操作。最初我们一般都是直接使用new Thread().start的方式,但我们知道,线程的创建和销毁都会耗费大量的资源,关于线程可以参考之前的一篇博客《 Java线程那点事儿 》,因此我们需要重用线程资源。

当然也有其他待解决方案,比如说coroutine,目前Kotlin已经支持了,JDK也已经有了相关的提案: Project Loom ,目前的实现方式和Kotlin有点类似,都是基于ForkJoinPool,当然目前还有很多限制以及问题没解决,比如synchronized还是住当前线程等。

继承结构

Java线程池ThreadPoolExecutor实现原理剖析

继承结构看起来很清晰,最顶层的Executor只提供了一个最简单的void execute(Runnable command)方法,然后是ExecutorService,ExecutorService提供了一些管理相关的方法,例如关闭、判断当前线程池的状态等,另外不同于Executor#execute,ExecutorService提供了一系列方法,可以将任务包装成一个Future,从而使得任务提交方可以跟踪任务的状态。而父类AbstractExecutorService则提供了一些默认的实现。

构造器

ThreadPoolExecutor的构造器提供了非常多的参数,每一个参数都非常的重要,一不小心就容易踩坑,因此设置的时候,你必须要知道自己在干什么。

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue<Runnable> workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler) {

    if (corePoolSize < 0 ||

        maximumPoolSize <= 0 ||

        maximumPoolSize < corePoolSize ||

        keepAliveTime < 0)

        throw new IllegalArgumentException();

    if (workQueue == null || threadFactory == null || handler == null)

        throw new NullPointerException();

    this.acc = System.getSecurityManager() == null ?

            null :

            AccessController.getContext();

    this.corePoolSize = corePoolSize;

    this.maximumPoolSize = maximumPoolSize;

    this.workQueue = workQueue;

    this.keepAliveTime = unit.toNanos(keepAliveTime);

    this.threadFactory = threadFactory;

    this.handler = handler;

} 
  1. corePoolSize、 maximumPoolSize。线程池会自动根据corePoolSize和maximumPoolSize去调整当前线程池的大小。当你通过submit或者execute方法提交任务的时候,如果当前线程池的线程数小于corePoolSize,那么线程池就会创建一个新的线程处理任务, 即使其他的core线程是空闲的。如果当前线程数大于corePoolSize并且小于maximumPoolSize,那么只有在队列"满"的时候才会创建新的线程。因此这里会有很多的坑,比如你的core和max线程数设置的不一样,希望请求积压在队列的时候能够实时的扩容,但如果制定了一个无界队列,那么就不会扩容了,因为队列不存在满的概念。
  2. keepAliveTime。如果当前线程池中的线程数超过了corePoolSize,那么如果在keepAliveTime时间内都没有新的任务需要处理,那么超过corePoolSize的这部分线程就会被销毁。默认情况下是不会回收core线程的,可以通过设置allowCoreThreadTimeOut改变这一行为。
  3. workQueue。即实际用于存储任务的队列,这个可以说是最核心的一个参数了,直接决定了线程池的行为,比如说传入一个有界队列,那么队列满的时候,线程池就会根据core和max参数的设置情况决定是否需要扩容,如果传入了一个SynchronousQueue,这个队列只有在另一个线程在同步remove的时候才可以put成功,对应到线程池中,简单来说就是如果有线程池任务处理完了,调用poll或者take方法获取新的任务的时候,新提交的任务才会put成功,否则如果当前的线程都在忙着处理任务,那么就会put失败,也就会走扩容的逻辑,如果传入了一个DelayedWorkQueue,顾名思义,任务就会根据过期时间来决定什么时候弹出,即为ScheduledThreadPoolExecutor的机制。
  4. threadFactory。创建线程都是通过ThreadFactory来实现的,如果没指定的话,默认会使用Executors.defaultThreadFactory(),一般来说,我们会在这里对线程设置名称、异常处理器等。
  5. handler。即当任务提交失败的时候,会调用这个处理器,ThreadPoolExecutor内置了多个实现,比如抛异常、直接抛弃等。这里也需要根据业务场景进行设置,比如说当队列积压的时候,针对性的对线程池扩容或者发送告警等策略。

看完这几个参数的含义,我们看一下Executors提供的一些工具方法,只要是为了方便使用,但是我建议最好少用这个类,而是直接用ThreadPoolExecutor的构造函数,多了解一下这几个参数到底是什么意思,自己的业务场景是什么样的,比如线程池需不需要扩容、用不用回收空闲的线程等。

public class Executors {



/*

* 提供一个固定大小的线程池,并且线程不会回收,由于传入的是一个无界队列,相当于队列永远不会满

* 也就不会扩容,因此需要特别注意任务积压在队列中导致内存爆掉的问题

*/

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue<Runnable>());

}





/*

*  这个线程池会一直扩容,由于SynchronousQueue的特性,如果当前所有的线程都在处理任务,那么

*  新的请求过来,就会导致创建一个新的线程处理任务。如果线程一分钟没有新任务处理,就会被回 

*  收掉。特别注意,如果每一个任务都比较耗时,并发又比较高,那么可能每次任务过来都会创建一个线 

*  程

*/

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());

}

} 

源码分析

既然是个线程池,那就必然有其生命周期:运行中、关闭、停止等。ThreadPoolExecutor是用一个AtomicInteger去的前三位表示这个状态的,另外又重用了低29位用于表示线程数,可以支持最大大概5亿多,绝逼够用了,如果以后硬件真的发展到能够启动这么多线程,改成AtomicLong就可以了。

状态这里主要分为下面几种:

  1. RUNNING:表示当前线程池正在运行中,可以接受新任务以及处理队列中的任务
  2. SHUTDOWN:不再接受新的任务,但会继续处理队列中的任务
  3. STOP:不再接受新的任务,也不处理队列中的任务了,并且会中断正在进行中的任务
  4. TIDYING:所有任务都已经处理完毕,线程数为0,转为为TIDYING状态之后,会调用terminated()回调
  5. TERMINATED:terminated()已经执行完毕

同时我们可以看到所有的状态都是用二进制位表示的,并且依次递增,从而方便进行比较,比如想获取当前状态是否至少为SHUTDOWN等,同时状态之前有几种转换:

  1. RUNNING -> SHUTDOWN。调用了shutdown()之后,或者执行了finalize()
  2. (RUNNING 或者 SHUTDOWN) -> STOP。调用了shutdownNow()之后会转换这个状态
  3. SHUTDOWN -> TIDYING。当线程池和队列都为空的时候
  4. STOP -> TIDYING。当线程池为空的时候
  5. IDYING -> TERMINATED。执行完terminated()回调之后会转换为这个状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;



private static final int RUNNING    = -1 << COUNT_BITS;

private static final int SHUTDOWN   =  0 << COUNT_BITS;

private static final int STOP       =  1 << COUNT_BITS;

private static final int TIDYING    =  2 << COUNT_BITS;

private static final int TERMINATED =  3 << COUNT_BITS;



//由于前三位表示状态,因此将CAPACITY取反,和进行与操作即可

private static int runStateOf(int c)     { return c & ~CAPACITY; }

private static int workerCountOf(int c)  { return c & CAPACITY; }



//高三位+第三位进行或操作即可

private static int ctlOf(int rs, int wc) { return rs | wc; }



private static boolean runStateLessThan(int c, int s) {

    return c < s;

}



private static boolean runStateAtLeast(int c, int s) {

    return c >= s;

}



private static boolean isRunning(int c) {

    return c < SHUTDOWN;

}



//下面三个方法,通过CAS修改worker的数目

private boolean compareAndIncrementWorkerCount(int expect) {

    return ctl.compareAndSet(expect, expect + 1);

}



//只尝试一次,失败了则返回,是否重试由调用方决定

private boolean compareAndDecrementWorkerCount(int expect) {

    return ctl.compareAndSet(expect, expect - 1);

}



//跟上一个不一样,会一直重试

private void decrementWorkerCount() {

    do {} while (! compareAndDecrementWorkerCount(ctl.get()));

} 

下面是比较核心的字段,这里workers采用的是非线程安全HashSet,而不是线程安全的版本,主要是因为这里有些复合的操作,比如说将worker添加到workers后,我们还需要判断是否需要更新largestPoolSize等,workers只在获取到mainLock的情况下才会进行读写,另外这里的mainLock也用于在中断线程的时候串行执行,否则如果不加锁的话,可能会造成并发去中断线程,引起不必要的中断风暴。

private final ReentrantLock mainLock = new ReentrantLock();



private final HashSet<Worker> workers = new HashSet<Worker>();



private final Condition termination = mainLock.newCondition();



private int largestPoolSize;



private long completedTaskCount;

核心方法

拿到一个线程池之后,我们就可以开始提交任务,让它去执行了,那么我们看一下submit方法是如何实现的。

public Future<?> submit(Runnable task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<Void> ftask = newTaskFor(task, null);

    execute(ftask);

    return ftask;

}



public <T> Future<T> submit(Callable<T> task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<T> ftask = newTaskFor(task);

    execute(ftask);

    return ftask;

} 

这两个方法都很简单,首先将提交过来的任务(有两种形式:Callable、Runnable)都包装成统一的RunnableFuture,然后调用execute方法,execute可以说是线程池最核心的一个方法。

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    int c = ctl.get();

    /*

        获取当前worker的数目,如果小于corePoolSize那么就扩容,

        这里不会判断是否已经有core线程,而是只要小于corePoolSize就会直接增加worker

     */

    if (workerCountOf(c) < corePoolSize) {

        /*

            调用addWorker(Runnable firstTask, boolean core)方法扩容

            firstTask表示为该worker启动之后要执行的第一个任务,core表示要增加的为core线程

         */

        if (addWorker(command, true))

            return;

        //如果增加失败了那么重新获取ctl的快照,比如可能线程池在这期间关闭了

        c = ctl.get();

    }

    /*

         如果当前线程池正在运行中,并且将任务丢到队列中成功了,

         那么就会进行一次double check,看下在这期间线程池是否关闭了,

         如果关闭了,比如处于SHUTDOWN状态,如上文所讲的,SHUTDOWN状态的时候,

         不再接受新任务,remove成功后调用拒绝处理器。而如果仍然处于运行中的状态,

         那么这里就double check下当前的worker数,如果为0,有可能在上述逻辑的执行

         过程中,有worker销毁了,比如说任务抛出了未捕获异常等,那么就会进行一次扩容,

         但不同于扩容core线程,这里由于任务已经丢到队列中去了,因此就不需要再传递firstTask了,

         同时要注意,这里扩容的是非core线程

     */

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        /*

            如果在上一步中,将任务丢到队列中失败了,那么就进行一次扩容,

            这里会将任务传递到firstTask参数中,并且扩容的是非core线程,

            如果扩容失败了,那么就执行拒绝策略。

         */

        reject(command);

} 

这里要特别注意下防止队列失败的逻辑,不同的队列丢任务的逻辑也不一样,例如说无界队列,那么就永远不会put失败,也就是说扩容也永远不会执行,如果是有界队列,那么当队列满的时候,会扩容非core线程,如果是SynchronousQueue,这个队列比较特殊,当有另外一个线程正在同步获取任务的时候,你才能put成功,因此如果当前线程池中所有的worker都忙着处理任务的时候,那么后续的每次新任务都会导致扩容,当然如果worker没有任务处理了,阻塞在获取任务这一步的时候,新任务的提交就会直接丢到队列中去,而不会扩容。

上文中多次提到了扩容,那么我们下面看一下线程池具体是如何进行扩容的:

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

        int c = ctl.get();

        //获取当前线程池的状态

        int rs = runStateOf(c);



        /*

            如果状态为大于SHUTDOWN, 比如说STOP,STOP上文说过队列中的任务不处理了,也不接受新任务,

            因此可以直接返回false不扩容了,如果状态为SHUTDOWN并且firstTask为null,同时队列非空,

            那么就可以扩容

         */

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

                firstTask == null &&

                ! workQueue.isEmpty()))

            return false;



        for (;;) {

            int wc = workerCountOf(c);

            /*

                若worker的数目大于CAPACITY则直接返回,

                然后根据要扩容的是core线程还是非core线程,进行判断worker数目

                是否超过设置的值,超过则返回

             */

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            /*

                通过CAS的方式自增worker的数目,成功了则直接跳出循环

             */

            if (compareAndIncrementWorkerCount(c))

                break retry;

            //重新读取状态变量,如果状态改变了,比如线程池关闭了,那么就跳到最外层的for循环,

            //注意这里跳出的是retry。

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // else CAS failed due to workerCount change; retry inner loop

        }

    }



    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        //创建Worker

        w = new Worker(firstTask);

        final Thread t = w.thread;

        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                /*

                    获取锁,并判断线程池是否已经关闭

                 */

                int rs = runStateOf(ctl.get());



                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // 若线程已经启动了,比如说已经调用了start()方法,那么就抛异常,

                        throw new IllegalThreadStateException();

                    //添加到workers中

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize) //更新largestPoolSize

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                //若Worker创建成功,则启动线程,这么时候worker就会开始执行任务了

                t.start();

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            //添加失败

            addWorkerFailed(w);

    }

    return workerStarted;

} 



private void addWorkerFailed(Worker w) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        if (w != null)

            workers.remove(w);

        decrementWorkerCount();

        //每次减少worker或者从队列中移除任务的时候都需要调用这个方法

        tryTerminate();

    } finally {

        mainLock.unlock();

    }

} 

这里有个貌似不太起眼的方法tryTerminate,这个方法会在所有可能导致线程池终结的地方调用,比如说减少worker的数目等,如果满足条件的话,那么将线程池转换为TERMINATED状态。另外这个方法没有用private修饰,因为ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,而ScheduledThreadPoolExecutor也会调用这个方法。

final void tryTerminate() {

    for (;;) {

        int c = ctl.get();

        /*

            如果当前线程处于运行中、TIDYING、TERMINATED状态则直接返回,运行中的没

            什么好说的,后面两种状态可以说线程池已经正在终结了,另外如果处于SHUTDOWN状态,

            并且workQueue非空,表明还有任务需要处理,也直接返回

         */

        if (isRunning(c) ||

            runStateAtLeast(c, TIDYING) ||

            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

            return;

        //可以退出,但是线程数非0,那么就中断一个线程,从而使得关闭的信号能够传递下去,

        //中断worker后,worker捕获异常后,会尝试退出,并在这里继续执行tryTerminate()方法,

        //从而使得信号传递下去

        if (workerCountOf(c) != 0) {

            interruptIdleWorkers(ONLY_ONE);

            return;

        }



        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            //尝试转换成TIDYING状态,执行完terminated回调之后

            //会转换为TERMINATED状态,这个时候线程池已经完整关闭了,

            //通过signalAll方法,唤醒所有阻塞在awaitTermination上的线程

            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

                try {

                    terminated();

                } finally {

                    ctl.set(ctlOf(TERMINATED, 0));

                    termination.signalAll();

                }

                return;

            }

        } finally {

            mainLock.unlock();

        }

        // else retry on failed CAS

    }

}



/**

 * 中断空闲的线程

 * @param onlyOne

 */

private void interruptIdleWorkers(boolean onlyOne) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (Worker w : workers) {

            //遍历所有worker,若之前没有被中断过,

            //并且获取锁成功,那么就尝试中断。

            //锁能够获取成功,那么表明当前worker没有在执行任务,而是在

            //获取任务,因此也就达到了只中断空闲线程的目的。

            Thread t = w.thread;

            if (!t.isInterrupted() && w.tryLock()) {

                try {

                    t.interrupt();

                } catch (SecurityException ignore) {

                } finally {

                    w.unlock();

                }

            }

            if (onlyOne)

                break;

        }

    } finally {

        mainLock.unlock();

    }

} 

Java线程池ThreadPoolExecutor实现原理剖析

Worker

下面看一下Worker类,也就是这个类实际负责执行任务,Worker类继承自AbstractQueuedSynchronizer,AQS可以理解为一个同步框架,提供了一些通用的机制,利用模板方法模式,让你能够原子的管理同步状态、blocking和unblocking线程、以及队列,具体的内容之后有时间会再写,还是比较复杂的。这里Worker对AQS的使用相对比较简单,使用了状态变量state表示是否获得锁,0表示解锁、1表示已获得锁,同时通过exclusiveOwnerThread存储当前持有锁的线程。另外再简单提一下,比如说CountDownLatch, 也是基于AQS框架实现的,countdown方法递减state,await阻塞等待state为0。

private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable

{



    /** Thread this worker is running in.  Null if factory fails. */

    final Thread thread;



    /** Initial task to run.  Possibly null. */

    Runnable firstTask;



    /** Per-thread task counter */

    volatile long completedTasks;



    Worker(Runnable firstTask) {

        setState(-1); // inhibit interrupts until runWorker

        this.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);

    }



    /** Delegates main run loop to outer runWorker  */

    public void run() {

        runWorker(this);

    }

   protected boolean isHeldExclusively() {

        return getState() != 0;

    }



    protected boolean tryAcquire(int unused) {

        if (compareAndSetState(0, 1)) {

            setExclusiveOwnerThread(Thread.currentThread());

            return true;

        }

        return false;

    }



    protected boolean tryRelease(int unused) {

        setExclusiveOwnerThread(null);

        setState(0);

        return true;

    }



    public void lock()        { acquire(1); }

    public boolean tryLock()  { return tryAcquire(1); }

    public void unlock()      { release(1); }

    public boolean isLocked() { return isHeldExclusively(); }



    void interruptIfStarted() {

        Thread t;

        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

            try {

                t.interrupt();

            } catch (SecurityException ignore) {

            }

        }

    }

} 

注意这里Worker初始化的时候,会通过setState(-1)将state设置为-1,并在runWorker()方法中置为0,上文说过Worker是利用state这个变量来表示锁的状态,那么加锁的操作就是通过CAS将state从0改成1,那么初始化的时候改成-1,也就是表示在Worker启动之前,都不允许加锁操作,我们再看interruptIfStarted()以及interruptIdleWorkers()方法,这两个方法在尝试中断Worker之前,都会先加锁或者判断state是否大于0,因此这里的将state设置为-1,就是为了禁止中断操作,并在runWorker中置为0,也就是说只能在Worker启动之后才能够中断Worker。

另外线程启动之后,其实就是调用了runWorker方法,下面我们看一下具体是如何实现的。

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // 调用unlock()方法,将state置为0,表示其他操作可以获得锁或者中断worker

    boolean completedAbruptly = true;

    try {

        /*

            首先尝试执行firstTask,若没有的话,则调用getTask()从队列中获取任务

         */

        while (task != null || (task = getTask()) != null) {

            w.lock();

            /*

                如果线程池正在关闭,那么中断线程。

             */

            if ((runStateAtLeast(ctl.get(), STOP) ||

                (Thread.interrupted() &&

                    runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

                //执行beforeExecute回调

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    //实际开始执行任务

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    //执行afterExecute回调

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                //这里加了锁,因此没有线程安全的问题,volatile修饰保证其他线程的可见性

                w.completedTasks++;

                w.unlock();//解锁

            }

        }

        completedAbruptly = false;

    } finally {

        //抛异常了,或者当前队列中已没有任务需要处理等

        processWorkerExit(w, completedAbruptly);

    }

}



private void processWorkerExit(Worker w, boolean completedAbruptly) {

    //如果是异常终止的,那么减少worker的数目

    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

        decrementWorkerCount();



    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        //将当前worker中workers中删除掉,并累加当前worker已执行的任务到completedTaskCount中

        completedTaskCount += w.completedTasks;

        workers.remove(w);

    } finally {

        mainLock.unlock();

    }



    //上文说过,减少worker的操作都需要调用这个方法

    tryTerminate();



    /*

        如果当前线程池仍然是运行中的状态,那么就看一下是否需要新增另外一个worker替换此worker

     */

    int c = ctl.get();

    if (runStateLessThan(c, STOP)) {

        /*

            如果是异常结束的则直接扩容,否则的话则为正常退出,比如当前队列中已经没有任务需要处理,

            如果允许core线程超时的话,那么看一下当前队列是否为空,空的话则不用扩容。否则话看一下

            是否少于corePoolSize个worker在运行。

         */

        if (!completedAbruptly) {

            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            if (min == 0 && ! workQueue.isEmpty())

                min = 1;

            if (workerCountOf(c) >= min)

                return; // replacement not needed

        }

        addWorker(null, false);

    }

}



 private Runnable getTask() {

    boolean timedOut = false; // 上一次poll()是否超时了



    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);



        // 若线程池关闭了(状态大于STOP)

        // 或者线程池处于SHUTDOWN状态,但是队列为空,那么返回null

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }



        int wc = workerCountOf(c);



        /*

            如果允许core线程超时 或者 不允许core线程超时但当前worker的数目大于core线程数,

            那么下面的poll()则超时调用

         */

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;



        /*

            获取任务超时了并且(当前线程池中还有不止一个worker 或者 队列中已经没有任务了),那么就尝试

            减少worker的数目,若失败了则重试

         */

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }



        try {

            //从队列中抓取任务

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            //走到这里表明,poll调用超时了

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

} 

关闭线程池

关闭线程池一般有两种形式,shutdown()和shutdownNow()。

public void shutdown() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();

        //通过CAS将状态更改为SHUTDOWN,这个时候线程池不接受新任务,但会继续处理队列中的任务

        advanceRunState(SHUTDOWN);

        //中断所有空闲的worker,也就是说除了正在处理任务的worker,其他阻塞在getTask()上的worker

        //都会被中断

        interruptIdleWorkers();

        //执行回调

        onShutdown(); // hook for ScheduledThreadPoolExecutor

    } finally {

        mainLock.unlock();

    }

    tryTerminate();

    //这个方法不会等待所有的任务处理完成才返回

}

public List<Runnable> shutdownNow() {

    List<Runnable> tasks;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();

        /*

            不同于shutdown(),会转换为STOP状态,不再处理新任务,队列中的任务也不处理,

            而且会中断所有的worker,而不只是空闲的worker

         */

        advanceRunState(STOP);

        interruptWorkers();

        tasks = drainQueue();//将所有的任务从队列中弹出

    } finally {

        mainLock.unlock();

    }

    tryTerminate();

    return tasks;

}



private List<Runnable> drainQueue() {

    BlockingQueue<Runnable> q = workQueue;

    ArrayList<Runnable> taskList = new ArrayList<Runnable>();

    /*

        将队列中所有的任务remove掉,并添加到taskList中,

        但是有些队列比较特殊,比如说DelayQueue,如果第一个任务还没到过期时间,则不会弹出,

        因此这里通过调用toArray方法,然后再一个一个的remove掉

     */

    q.drainTo(taskList);

    if (!q.isEmpty()) {

        for (Runnable r : q.toArray(new Runnable[0])) {

            if (q.remove(r))

                taskList.add(r);

        }

    }

    return taskList;

} 

从上文中可以看到,调用了shutdown()方法后,不会等待所有的任务处理完毕才返回,因此需要调用awaitTermination()来实现。

public boolean awaitTermination(long timeout, TimeUnit unit)

    throws InterruptedException {

    long nanos = unit.toNanos(timeout);

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (;;) {

            //线程池若已经终结了,那么就返回

            if (runStateAtLeast(ctl.get(), TERMINATED))

                return true;

            //若超时了,也返回掉

            if (nanos <= 0)

                return false;

            //阻塞在信号量上,等待线程池终结,但是要注意这个方法可能会因为一些未知原因随时唤醒当前线程,

            //因此需要重试,在tryTerminate()方法中,执行完terminated()回调后,表明线程池已经终结了,

            //然后会通过termination.signalAll()唤醒当前线程

            nanos = termination.awaitNanos(nanos);

        }

    } finally {

        mainLock.unlock();

    }

}

一些统计相关的方法

public int getPoolSize() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        //若线程已终结则直接返回0,否则计算works中的数目

       //想一下为什么不用workerCount呢?

        return runStateAtLeast(ctl.get(), TIDYING) ? 0

            : workers.size();

    } finally {

        mainLock.unlock();

    }

}



public int getActiveCount() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        int n = 0;

        for (Worker w : workers)

            if (w.isLocked())//上锁的表明worker当前正在处理任务,也就是活跃的worker

                ++n;

        return n;

    } finally {

        mainLock.unlock();

    }

}





public int getLargestPoolSize() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        return largestPoolSize;

    } finally {

        mainLock.unlock();

    }

}



//获取任务的总数,这个方法慎用,若是个无解队列,或者队列挤压比较严重,会很蛋疼

public long getTaskCount() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        long n = completedTaskCount;//比如有些worker被销毁后,其处理完成的任务就会叠加到这里

        for (Worker w : workers) {

            n += w.completedTasks;//叠加历史处理完成的任务

            if (w.isLocked())//上锁表明正在处理任务,也算一个

                ++n;

        }

        return n + workQueue.size();//获取队列中的数目

    } finally {

        mainLock.unlock();

    }

}





public long getCompletedTaskCount() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        long n = completedTaskCount;

        for (Worker w : workers)

            n += w.completedTasks;

        return n;

    } finally {

        mainLock.unlock();

    }

} 

总结

这篇博客基本上覆盖了线程池的方方面面,但仍然有非常多的细节可以深究,比如说异常的处理,可以参照之前的一篇博客:《 深度解析Java线程池的异常处理机制 》,另外还有AQS、unsafe等可以之后再单独总结。

原文链接: https://github.com/aCoder2013/blog/issues/28

原文 

http://dockone.io/article/8284

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Java线程池ThreadPoolExecutor实现原理剖析

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址