转载

JDK线程池

线程池用于存放线程, 通过对线程的复用, 很大程度上减少了频繁创建和销毁线程导致的资源损耗. 下面简单地介绍一下 JDK(1.8)中的线程池.

线程池参数

在介绍JDK的4种线程池之前, 先介绍一下线程池的几个参数

allowCoreThreadTimeOut 允许核心线程池被回收, 默认 false, so 默认情况下 核心线程并不会被回收掉.
// 用一个原子变量来存储线程池状态和线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 32 - 3 = 29, int 的左边三位表示线程池状态,右边29位表示线程数量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大线程数量: 000 & (29个1) = 2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池的几个状态
    // 100 & (29个0),线程池运行状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 000 & (29个0), 执行shutdown方法时,不接收新的任务, 会先执行完当前已经接收的, 任务队列任务执行完 && 线程已经全部终止: state -> TIDYING
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 001 & (29个0), 执行 shutdownNow() 方法时
    private static final int STOP       =  1 << COUNT_BITS;
    // 010 & (29个0)
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 011 & (29个0), 线程池已经终止
    private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

JDK 提供的四种线程池介绍

newFixedThreadPool

固定线程池数量, 核心线程数 = 最大线程数

任务队列: LinkedBlockingQueue(Integer.MAX_VALUE) 无界队列

适用于同时处理固定任务数的场景.

public static ExecutorService newFixedThreadPool(int nThreads) {
	      // coreThreads = maxThreads
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
复制代码

newCachedThreadPool

核心线程数为0, 最大为 Integer.MAX_VALUE,也就是当任务足够多的时候, 可以无限增加线程. 并且所有的线程空闲超过一段时间(调用 Executors 创建的默认 KeepAlive为 60s)就会被回收.

任务队列: SynchronousQueue 默认传入参数 fair=false , 即处理任务非公平.

适用于处理小任务

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
复制代码

SynchronousQueue

public SynchronousQueue(boolean fair) {
        // fair = true 则会按照FIFO先入先出的顺序执行 
        // fair = false(默认值) 则优先取出最新添加的任务, 最早添加的任务最晚执行
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }
复制代码

newSingleThreadExecutor

单个线程的线程池

任务队列: LinkedBlockingQueue 同样是无界队列

适用于需要将任务按顺序执行的时候

public static ExecutorService newSingleThreadExecutor() {
        // 核心线程数=最大线程数=1
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
复制代码

newScheduledThreadPool

固定核心线程数, 线程数量不会再增长, maximumPoolSize 这个参数对定时线程池没有作用.

oracle的api文档是这么写的:

While this class inherits from ThreadPoolExecutor , a few of the inherited tuning methods are not useful for it. In particular, because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have no useful effect.

任务队列: DelayedWorkQueue 无界队列, 这是 ScheduledThreadPoolExecutor 的一个内部类

更适用于需要延时执行或者定时需求的场景

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
复制代码

线程池拒绝策略

AbortPolicy

拒绝新任务,并且抛出异常, 默认的拒绝策略

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 直接抛出异常 
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
复制代码

CallerRunsPolicy

当拒绝任务的时候,由调用线程处理该任务.

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        public CallerRunsPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 如果线程池未停止
            if (!e.isShutdown()) {
	        // 当前线程直接调用任务的run方法
                r.run();
            }
        }
    }
复制代码

DiscardPolicy

拒绝新任务,静悄悄的将新任务丢弃,而不通知(太坑了吧), 具体看它的代码也是什么事情都没做, 还真的就直接丢弃任务了.

public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          // 任务被拒绝后,啥事情都不干
        }
    }
复制代码

DiscardOldestPolicy

当任务满时, 抛弃旧的未处理的任务, 然后重新执行 execute 方法(此过程会重复), 除非线程池停止运行, 这种情况任务将被丢弃.具体看代码

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        public DiscardOldestPolicy() { }
				public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  					// 如果线程池停止, 直接丢弃任务,不做任何处理
            if (!e.isShutdown()) {
               // 丢弃一个任务队列中的任务
               e.getQueue().poll();
               // 重新执行被拒绝的任务, 如果再次被拒绝, 则会一直重复这个过程
               e.execute(r);
            }
        }
   }
复制代码

终止线程池

shutdown

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改状态为 shutdown
            advanceRunState(SHUTDOWN);
            // 停止空闲的线程, 有执行任务的不会停止
            interruptIdleWorkers();
            // ScheduledThreadPoolExecutor 定时线程池才用到这个方法
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        // 终止线程池里的所有线程
        tryTerminate();
    }
复制代码

interruptIdleWorkers , 从方法名可以看出, 它会终止掉当前空闲的线程

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                // 因为执行任务 runWorker() 的时候, 执行了 worker.lock()方法
                // 所以如果当前线程有任务在执行, 则 tryLock不会成功, 
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
复制代码

shutdownNow

这个方法的作用是立刻强制停止所有线程, 即使该线程有正在执行的任务.

并且停止所有线程后,返回任务队列中还未执行的任务.

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改状态为 stop
            advanceRunState(STOP);
            // 强制停止所有线程,有任务在执行也不管
            interruptWorkers();
            // 返回还未执行的任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 终止线程池里的所有线程
        tryTerminate();
        return tasks;
    }

    // 暂停所有 worker 线程 
	  private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

		void interruptIfStarted() {
        Thread t;
      	// 如果线程已经启动则直接终止
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
          try {
            t.interrupt();
          } catch (SecurityException ignore) {
          }
        }
     }
复制代码

下面这段代码两个shutdown方法都调用到

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 以下情况,取消终止:
            // 1.线程池是运行状态 running
            // 2.runStateAtLeast(c, TIDYING) 线程池状态是 tidying 或者  terminated
            // 3.线程池状态为 shutdown && 任务队列不为空,即是任务没处理完 (shutdown状态的时候会继续处理已经添加的任务)
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;

            /**
             * worker 在 {@link #getTask()} 的时候, 获取到 null 表示 worker 此时需要停止
             * worker[] 移除当前worker后会调用这个方法将线程进行终止
             * 每个worker停止的时候, 会调用这个方法将当前线程进行终止
             * so it is ONLY_ONE
             * {@link #processWorkerExit}
             */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 代码要执行到这里到话, ( state == stop || ( state == shutdown && workQueue 队列为空) ) && 没有正在运行的线程
                // 这两种情况下, 所有的线程都已经终止
                // cas 尝试修改 ctl 的状态,
                // 修改失败,外层 for循环会再次执行
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 这里不做任何事情
                        terminated();
                    } finally {
                        // 最后将状态修改为 terminated, 表示线程池完全停止
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知所有在等待锁的线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS

        }
    }
复制代码

shutdown 和 shutdownNow 对比:

shutdown: 线程将执行完已经添加进队列中的所有任务, 不接受新任务.

shutdownNow: 立刻终止所有正在运行的线程, 并且返回任务队列中的任务.

原文  https://juejin.im/post/5dba4a7bf265da4d54208adb
正文到此结束
Loading...