public class ThreadPoolExecutor extends AbstractExecutorService {
// 包含 4 个构造方法。 其他 3 个通过调用该构造方法。
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数量
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 线程没有执行任务时最大保持多久终止
TimeUnit unit, // keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列,存储等待执行的任务
ThreadFactory threadFactory, // 线程工厂,用来创建线程
RejectedExecutionHandler handler // 当拒绝处理任务时的策略
) { }
}
复制代码
核心线程数量。
默认情况下,创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列(workQueue)当中。
最大线程数。在线程池中最多能创建多少个线程。
线程没有执行任务时最大保持多久终止。
默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用。
调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
参数 keepAliveTime 的时间单位
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒 复制代码
阻塞队列,存储等待执行的任务,对线程池运行过程产生重大影响。
线程工厂,主要用来创建线程。
当拒绝处理任务时的策略。
// 丢弃任务,并抛出 RejectedExecutionException ThreadPoolExecutor.AbortPolicy // 丢弃任务,不抛出异常 ThreadPoolExecutor.DiscardPolicy // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.DiscardOldestPolicy // 调用线程处理该任务 ThreadPoolExecutor.CallerRunsPolicy 复制代码
public class ThreadPoolExecutor extends AbstractExecutorService {
// 提交任务,交给线程池执行
public void execute(Runnable command) {}
// 提交任务,能够返回执行结果 (execute + Future)
public Future<?> submit(Runnable task) {}
public <T> Future<T> submit(Runnable task, T result) {}
public <T> Future<T> submit(Callable<T> task) {}
// 关闭线程池,等待任务执行完
public void shutdown() {}
// 立即关闭,不等待任务关闭
public void shutdownNow() {}
// 获得线程池中已执行和未执行的任务总数
public long getTaskCount() {}
// 获得已完成任务数量
public long getCompletedTaskCount() {}
// 线程池当前的线程数量
public int getPoolSize() {}
// 获得当前线程池中正在执行的线程数量
public int getActiveCount() {
}
复制代码
利用Executors提供的通用线程池创建方法,创建不同配置的线程池,主要区别在于不同的ExecutorService类型或者不同的初始参数。
public static void main(String[] args) {
ExecutorService pool = Executors.newSingleThreadExecutor();
// 执行过程将会顺序输出 0 --> 9
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
System.out.println(index);
})
}
}
复制代码
// corePoolSize = 1
// maximumPoolSIze = 1
// keyAliveTime = 0L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
public static void main(String[] args) {
// 线程池大小为3
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 每隔秒打印三个数字
for (int i = 0; i < 50; i++) {
final int index = i;
executorService.execute(() -> {
System.out.println(index);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
复制代码
// corePoolSize = nThreads
// maximumPoolSIze = nThreads
// keyAliveTime = 0L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
复制代码
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(index);
}
});
}
executorService.shuntdown();
}
复制代码
// corePoolSize = 0
// maximumPoolSIze = Integer.MAX_VALUE
// keyAliveTime = 60L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
public static void main(String[] args) {
// new ScheduledThreadPoolExecutor(corePoolSize)
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
// executorService 执行具有调度含义
// delay: 3 SECONDS 后执行
executorService.schedule(() -> System.out.println("schedule running"), 2, TimeUnit.SECONDS);
executorService.shutdown();
}
复制代码
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
// executorService 执行具有调度含义
// scheduleAtFixedRate 以指定的速率运行 每隔一段时候就触发
// 1: initalDelay 延迟1秒
// 3: period 每格3秒
executorService.scheduleAtFixedRate(() -> System.out.println(System.nanoTime()), 1, 3, TimeUnit.SECONDS);
// 不适用关闭线程池
// 若需要关闭线程池,可通过提供关闭信息,再调用该方法
// executorService.shutdown();
}
复制代码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ---
// DEFAULT_KEEPALIVE_MILLIS = 10L
// MILLISECONDS = TimeUnit.MILLI_SCALE
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
复制代码
Java 8才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处 理任务(默认为主机CPU的可用核心数),不保证处理顺序
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
// ---
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
复制代码