线程池用于存放线程, 通过对线程的复用, 很大程度上减少了频繁创建和销毁线程导致的资源损耗. 下面简单地介绍一下 JDK(1.8)中的线程池.
在介绍JDK的4种线程池之前, 先介绍一下线程池的几个参数
allowCoreThreadTimeOut 允许核心线程池被回收, 默认 false, so 默认情况下 核心线程并不会被回收掉.
// 用一个原子变量来存储线程池状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 32 - 3 = 29, int 的左边三位表示线程池状态,右边29位表示线程数量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 最大线程数量: 000 & (29个1) = 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的几个状态
// 100 & (29个0),线程池运行状态
private static final int RUNNING = -1 << COUNT_BITS;
// 000 & (29个0), 执行shutdown方法时,不接收新的任务, 会先执行完当前已经接收的, 任务队列任务执行完 && 线程已经全部终止: state -> TIDYING
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 & (29个0), 执行 shutdownNow() 方法时
private static final int STOP = 1 << COUNT_BITS;
// 010 & (29个0)
private static final int TIDYING = 2 << COUNT_BITS;
// 011 & (29个0), 线程池已经终止
private static final int TERMINATED = 3 << COUNT_BITS;
复制代码
固定线程池数量, 核心线程数 = 最大线程数
任务队列: LinkedBlockingQueue(Integer.MAX_VALUE) 无界队列
适用于同时处理固定任务数的场景.
public static ExecutorService newFixedThreadPool(int nThreads) {
// coreThreads = maxThreads
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
核心线程数为0, 最大为 Integer.MAX_VALUE,也就是当任务足够多的时候, 可以无限增加线程. 并且所有的线程空闲超过一段时间(调用 Executors 创建的默认 KeepAlive为 60s)就会被回收.
任务队列: SynchronousQueue 默认传入参数 fair=false
, 即处理任务非公平.
适用于处理小任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
SynchronousQueue
public SynchronousQueue(boolean fair) {
// fair = true 则会按照FIFO先入先出的顺序执行
// fair = false(默认值) 则优先取出最新添加的任务, 最早添加的任务最晚执行
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
复制代码
单个线程的线程池
任务队列: LinkedBlockingQueue 同样是无界队列
适用于需要将任务按顺序执行的时候
public static ExecutorService newSingleThreadExecutor() {
// 核心线程数=最大线程数=1
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
固定核心线程数, 线程数量不会再增长, maximumPoolSize 这个参数对定时线程池没有作用.
oracle的api文档是这么写的:
While this class inherits from
ThreadPoolExecutor
, a few of the inherited tuning methods are not useful for it. In particular, because it acts as a fixed-sized pool using corePoolSize
threads and an unbounded queue, adjustments to maximumPoolSize
have no useful effect.
任务队列: DelayedWorkQueue 无界队列, 这是 ScheduledThreadPoolExecutor 的一个内部类
更适用于需要延时执行或者定时需求的场景
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
拒绝新任务,并且抛出异常, 默认的拒绝策略
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 直接抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
复制代码
当拒绝任务的时候,由调用线程处理该任务.
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池未停止
if (!e.isShutdown()) {
// 当前线程直接调用任务的run方法
r.run();
}
}
}
复制代码
拒绝新任务,静悄悄的将新任务丢弃,而不通知(太坑了吧), 具体看它的代码也是什么事情都没做, 还真的就直接丢弃任务了.
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 任务被拒绝后,啥事情都不干
}
}
复制代码
当任务满时, 抛弃旧的未处理的任务, 然后重新执行 execute 方法(此过程会重复), 除非线程池停止运行, 这种情况任务将被丢弃.具体看代码
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 如果线程池停止, 直接丢弃任务,不做任何处理
if (!e.isShutdown()) {
// 丢弃一个任务队列中的任务
e.getQueue().poll();
// 重新执行被拒绝的任务, 如果再次被拒绝, 则会一直重复这个过程
e.execute(r);
}
}
}
复制代码
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改状态为 shutdown
advanceRunState(SHUTDOWN);
// 停止空闲的线程, 有执行任务的不会停止
interruptIdleWorkers();
// ScheduledThreadPoolExecutor 定时线程池才用到这个方法
onShutdown();
} finally {
mainLock.unlock();
}
// 终止线程池里的所有线程
tryTerminate();
}
复制代码
interruptIdleWorkers
, 从方法名可以看出, 它会终止掉当前空闲的线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 因为执行任务 runWorker() 的时候, 执行了 worker.lock()方法
// 所以如果当前线程有任务在执行, 则 tryLock不会成功,
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
复制代码
这个方法的作用是立刻强制停止所有线程, 即使该线程有正在执行的任务.
并且停止所有线程后,返回任务队列中还未执行的任务.
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改状态为 stop
advanceRunState(STOP);
// 强制停止所有线程,有任务在执行也不管
interruptWorkers();
// 返回还未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 终止线程池里的所有线程
tryTerminate();
return tasks;
}
// 暂停所有 worker 线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
// 如果线程已经启动则直接终止
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
复制代码
下面这段代码两个shutdown方法都调用到
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下情况,取消终止:
// 1.线程池是运行状态 running
// 2.runStateAtLeast(c, TIDYING) 线程池状态是 tidying 或者 terminated
// 3.线程池状态为 shutdown && 任务队列不为空,即是任务没处理完 (shutdown状态的时候会继续处理已经添加的任务)
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/**
* worker 在 {@link #getTask()} 的时候, 获取到 null 表示 worker 此时需要停止
* worker[] 移除当前worker后会调用这个方法将线程进行终止
* 每个worker停止的时候, 会调用这个方法将当前线程进行终止
* so it is ONLY_ONE
* {@link #processWorkerExit}
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 代码要执行到这里到话, ( state == stop || ( state == shutdown && workQueue 队列为空) ) && 没有正在运行的线程
// 这两种情况下, 所有的线程都已经终止
// cas 尝试修改 ctl 的状态,
// 修改失败,外层 for循环会再次执行
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 这里不做任何事情
terminated();
} finally {
// 最后将状态修改为 terminated, 表示线程池完全停止
ctl.set(ctlOf(TERMINATED, 0));
// 通知所有在等待锁的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
复制代码
shutdown: 线程将执行完已经添加进队列中的所有任务, 不接受新任务.
shutdownNow: 立刻终止所有正在运行的线程, 并且返回任务队列中的任务.