马上就要过年了,还在岗位上坚守“swimming”的小伙伴们顶住。博主给大家带来一篇线程池的基本使用解解闷。
1、减少线程创建与切换的开销
2、控制线程的数量
重复利用有限的线程
其实常用Java线程池本质上都是由 ThreadPoolExecutor
或者 ForkJoinPool
生成的,只是其根据构造函数传入不同的实参来实例化相应线程池而已。
Executors
是一个线程池工厂类,该工厂类包含如下集合静态工厂方法来创建线程池:
newFixedThreadPool() newSingleThreadExecutor() newCachedThreadPool() newWorkStealingPool() newScheduledThreadPool()
对设计模式有了解过的同学都会知道,我们尽量面向接口编程,这样对程序的灵活性是非常友好的。Java线程池也采用了面向接口编程的思想,可以看到 ThreadPoolExecutor
和 ForkJoinPool
所有都是 ExecutorService
接口的实现类。在 ExecutorService
接口中定义了一些常用的方法,然后再各种线程池中都可以使用 ExecutorService
接口中定义的方法,常用的方法有如下几个:
向线程池提交线程
Future<?> submit() void execute(Runnable command)
关闭线程池
void shutdown() List<Runnable> shutdownNow()
检查线程池的状态
boolean isShutdown() boolean isTerminated()
线程池中的线程数目是固定的,不管你来了多少的任务。
public class MyFixThreadPool {
public static void main(String[] args) throws InterruptedException {
// 创建一个线程数固定为5的线程池
ExecutorService service = Executors.newFixedThreadPool(5);
System.out.println("初始线程池状态:" + service);
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println("线程提交完毕之后线程池状态:" + service);
service.shutdown();//会等待所有的线程执行完毕才关闭,shutdownNow:立马关闭
System.out.println("是否全部线程已经执行完毕:" + service.isTerminated());//所有的任务执行完了,就会返回true
System.out.println("是否已经执行shutdown()" + service.isShutdown());
System.out.println("执行完shutdown()之后线程池的状态:" + service);
TimeUnit.SECONDS.sleep(5);
System.out.println("5秒钟过后,是否全部线程已经执行完毕:" + service.isTerminated());
System.out.println("5秒钟过后,是否已经执行shutdown()" + service.isShutdown());
System.out.println("5秒钟过后,线程池状态:" + service);
}
}
复制代码
初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕之后线程池状态:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否全部线程已经执行完毕:false
是否已经执行shutdown():true
执行完shutdown()之后线程池的状态:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒钟过后,是否全部线程已经执行完毕:true
5秒钟过后,是否已经执行shutdown():true
5秒钟过后,线程池状态:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
Running
状态了,但是 pool size
(线程池线程的数量)、 active threads
(当前活跃线程) queued tasks
(当前排队线程)、 completed tasks
(已完成的任务数)都是0 pool size = 5
:因为我们创建的是一个固定线程数为5的线程池(注意:如果这个时候我们只提交了3个任务,那么 pool size = 3
,说明线程池也是通过懒加载的方式去创建线程)。 active threads = 5
:虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,所以活跃线程只有5个 queued tasks = 1
:虽然我们向线程池提交了6个任务,但是线程池的固定大小为5,只能有5个活跃线程同时工作,所以有一个任务在等待 shutdown()
的时候,由于任务还没有全部执行完毕,所以 isTerminated()
返回 false
, shutdown()
返回true,而线程池的状态会由 Running
变为 Shutting down
pool-1-thread-2
执行了两次任务,证明线程池中的线程确实是重复利用的。 isTerminated()
返回 true
, shutdown()
返回 true
,证明所有的任务都执行完了,线程池也关闭了,我们再次检查线程池的状态 [Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
,状态已经处于 Terminated
了,然后已完成的任务显示为6 从头到尾整个线程池都只有一个线程在工作。
public class SingleThreadPool {
public static void main(String[] args) {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int j = i;
service.execute(() -> {
System.out.println(j + " " + Thread.currentThread().getName());
});
}
}
}
复制代码
0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
程序分析可以看到只有 pool-1-thread-1
一个线程在工作。
来多少任务,就创建多少线程(前提是没有空闲的线程在等待执行任务,否则还是会复用之前旧(缓存)的线程),直接你电脑能支撑的线程数的极限为止。
public class CachePool {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newCachedThreadPool();
System.out.println("初始线程池状态:" + service);
for (int i = 0; i < 12; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println("线程提交完毕之后线程池状态:" + service);
TimeUnit.SECONDS.sleep(50);
System.out.println("50秒后线程池状态:" + service);
TimeUnit.SECONDS.sleep(30);
System.out.println("80秒后线程池状态:" + service);
}
}
复制代码
初始线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
线程提交完毕之后线程池状态:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒后线程池状态:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒后线程池状态:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]
可以在指定延迟后或周期性地执行线程任务的线程池。
newScheduledThreadPool()
方法返回的其实是一个 ScheduledThreadPoolExecutor
对象, ScheduledThreadPoolExecutor
定义如下: public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
复制代码
ThreadPoolExecutor
并实现了 ScheduledExecutorService
接口,而 ScheduledExecutorService
也是继承了 ExecutorService
接口,所以我们也可以像使用之前的线程池对象一样使用,只不过是该对象会额外多了一些方法用于控制延迟与周期: public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit)
下面代码每500毫秒打印一次当前线程名称以及一个随机数字。
public class MyScheduledPool {
public static void main(String[] args) {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000));
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
复制代码
每个线程维护着自己的队列,执行完自己的任务之后,会去主动执行其他线程队列中的任务。
public class MyWorkStealingPool {
public static void main(String[] args) throws IOException {
ExecutorService service = Executors.newWorkStealingPool(4);
System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors());
service.execute(new R(1000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出
System.in.read();
}
static class R implements Runnable {
int time;
R(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
复制代码
cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1
程序分析 ForkJoinPool-1-worker-1
任务的执行时间是1秒,它会最先执行完毕,然后它会去主动执行其他线程队列中的任务。
ForkJoinPool
可以将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。 ForkJoinPool
提供了如下几个方法用于创建 ForkJoinPool
实例对象:
ForkJoinPool(int parallelism)
:创建一个包含parallelism个并行线程的 ForkJoinPool
,parallelism的默认值为 Runtime.getRuntime().availableProcessors()
方法的返回值 ForkJoinPool commonPool()
:该方法返回一个通用池,通用池的运行状态不会受 shutdown()
或 shutdownNow()
方法的影响。
创建了 ForkJoinPool
示例之后,就可以调用 ForkJoinPool
的 submit(ForkJoinTask task)
或 invoke(ForkJoinTask task)
方法来执行指定任务了。其中 ForkJoinTask
(实现了Future接口)代表一个可以并行、合并的任务。 ForkJoinTask
是一个抽象类,他还有两个抽象子类: RecursiveAction
和 RecursiveTask
。其中 RecursiveTask
代表有返回值的任务,而 RecursiveAction
代表没有返回值的任务。
下面代码演示了使用 ForkJoinPool
对1000000个随机整数进行求和。
public class MyForkJoinPool {
static int[] nums = new int[1000000];
static final int MAX_NUM = 50000;
static Random random = new Random();
static {
for (int i = 0; i < nums.length; i++) {
nums[i] = random.nextInt(1000);
}
System.out.println(Arrays.stream(nums).sum());
}
// static class AddTask extends RecursiveAction {
//
// int start, end;
//
// AddTask(int start, int end) {
// this.start = start;
// this.end = end;
// }
//
// @Override
// protected void compute() {
// if (end - start <= MAX_NUM) {
// long sum = 0L;
// for (int i = 0; i < end; i++) sum += nums[i];
// System.out.println("from:" + start + " to:" + end + " = " + sum);
// } else {
// int middle = start + (end - start) / 2;
//
// AddTask subTask1 = new AddTask(start, middle);
// AddTask subTask2 = new AddTask(middle, end);
// subTask1.fork();
// subTask2.fork();
// }
// }
// }
static class AddTask extends RecursiveTask<Long> {
int start, end;
AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 当end与start之间的差大于MAX_NUM,将大任务分解成两个“小任务”
if (end - start <= MAX_NUM) {
long sum = 0L;
for (int i = start; i < end; i++) sum += nums[i];
return sum;
} else {
int middle = start + (end - start) / 2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);
// 并行执行两个“小任务”
subTask1.fork();
subTask2.fork();
// 把两个“小任务”累加的结果合并起来
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) throws IOException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
AddTask task = new AddTask(0, nums.length);
forkJoinPool.execute(task);
long result = task.join();
System.out.println(result);
forkJoinPool.shutdown();
}
}
复制代码
上面我们说到过:其实常用Java线程池都是由 ThreadPoolExecutor
或者 ForkJoinPool
两个类生成的,只是其根据构造函数传入不同的实参来生成相应线程池而已。那我们现在一起来看看Executors中几个创建线程池对象的静态方法相关的源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
复制代码
corePoolSize maximumPoolSize keepAliveTime unit workQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
复制代码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
复制代码
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
复制代码
觉得文章写得不错的朋友可以点赞、转发、加关注呀!你们的支持就是我最大的动力,笔芯!