通常都是使用Executors提供的通用线程池创建方法去创建不同配置的线程池,主要区别在于不同的ExecutorService类型或者不同的初始参数,主要分为五类:
void execute(Runnable command); <T> Future<T> submit(Callable<T> task);
线程池:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
private final BlockingQueue<Runnable> workQueue; private final HashSet<Worker> workers = new HashSet<>();
ctl变量被赋予了双重角色,高位代表了线程池状态,低位代表了工作线程数目。这是一个典型的高效优化。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 真正决定了工作线程数的理论上限
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 线程池状态,存储在数字的高位
private static final int RUNNING = -1 << COUNT_BITS;
…
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程的状态流转:
注意,实际java代码中不存在idle状态。
public void execute(Runnable command) {
…
int c = ctl.get();
// 检查工作线程数目,低于 corePoolSize 则添加 Worker
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning 就是检查线程池是否被 shutdown
// 工作队列可能是有界的,offer 是比较友好的入队方式
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次进行防御性检查
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 尝试添加一个 worker,如果失败意味着已经饱和或者被 shutdown 了
else if (!addWorker(command, false))
reject(command);
}