ThreadPoolExecutor 源码解读
傻瓜源码-内容简介
傻瓜源码-内容简介 |
---|
【职场经验】(持续更新) 如何日常学习、如何书写简历、引导面试官、系统准备面试、选择offer、提高绩效、晋升TeamLeader….. |
【源码解读】(持续更新)<br/> 1. 源码选材:Java架构师必须掌握的所有框架和类库源码 <br/> 2. 内容大纲:按照“企业应用Demo”讲解执行源码:总纲“阅读指南”、第一章“源码基础”、第二章“相关Java基础”、第三章“白话讲源码”、第四章“代码解读”、第五章“设计模式”、第六章“附录-面试习题、相关JDK方法、中文注释可运行源码项目” 3. 读后问题:粉丝群答疑解惑 |
已收录: HashMap 、 ReentrantLock 、 ThreadPoolExecutor 、 《Spring源码解读》 、 《Dubbo源码解读》 ….. |
【面试题集】(持续更新)<br/>1. 面试题选材:Java面试常问的所有面试题和必会知识点<br/>2. 内容大纲:第一部分”注意事项“、第二部分“面试题解读”(包括:”面试题“、”答案“、”答案详解“、“实际开发解说”) 3. 深度/广度:面试题集中的答案和答案详解,都是对齐一般面试要求的深度和广度 4. 读后问题:粉丝群答疑解惑
|
已收录: Java基础面试题集 、 Java并发面试题集 、 JVM面试题集 、 数据库(Mysql)面试题集 、 缓存(Redis)面试题集 ….. |
【粉丝群】(持续更新) <br/> 收录:阿里、字节跳动、京东、小米、美团、哔哩哔哩等大厂内推 |
:stuck_out_tongue: 作者介绍:Spring系源码贡献者、世界五百强互联网公司、TeamLeader、Github开源产品作者 :stuck_out_tongue: 作者微信:wowangle03 (企业内推联系我) |
加入我的粉丝社群,阅读更多内容。从学习到面试,从面试到工作,从 coder 到 TeamLeader,每天给你答疑解惑,还能有第二份收入!
第 1 章 阅读指南
- 本文基于 open-jdk 1.8 版本。
- 本文根据” Demo “解读源码。
-
本文建议分为两个学习阶段,掌握了第一阶段,再进行第二阶段;
- 第一阶段,理解章节“源码解读”前的所有内容。即掌握 IT 技能:熟悉 ThreadPoolExecutor 原理。
- 第二阶段,理解章节“源码解读”(包括源码解读)之后的内容。即掌握 IT 技能:精读 ThreadPoolExecutor 源码。
- 建议按照本文内容顺序阅读(内容前后顺序存在依赖关系)。
- 阅读过程中,如果遇到问题,记下来,后面不远的地方肯定有解答,或者在粉丝群里提问。
- 阅读章节“源码解读”时,建议获得中文注释源码项目配合本文,Debug 进行阅读学习。
-
源码项目中的注释含义:
- “ Demo ”在源码中,会标注“ // ThreadPoolExecutor Demo ”。
- 在源码中的不易定位到的主线源码,会标注 “ // tofix 主线 ”。
-
以下注释的源码,暂时不深入讲解:
- 在执行“ Demo ”过程中,没有被执行到的源码(由于遍历空集合、 if 判断),会标注“ / * Demo不涉及 / ”。
第 2 章 Demo
// ThreadPoolExecutor Demo public class ThreadPoolExecutorTest { @Test public void testThreadPoolExecutor() { // 初始化线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, // 指定 corePoolSize (核心线程数) 1, // 指定的 maximumPoolSize (线程池可以创建的最大线程数) 0, // 指定的 keepAliveTime(线程空闲的情况下,可以存活的时间) TimeUnit.SECONDS, // keepAliveTime 的时间单位 new LinkedBlockingQueue<Runnable>()// 指定的 workQueue(装任务的队列) ); // 初始化任务 Task task = new Task(); // 向线程池提交任务 executor.execute(task); // 关闭线程池有以下两种方法: // 关闭线程池: // 1.线程池不会接受新的任务,执行拒绝任务处理器(默认是抛出异常) // 2.线程池执行完已经提交了的任务再结束 executor.shutdown(); // 关闭线程池: // 1.线程池不会接受新的任务,执行拒绝任务处理器(默认是抛出异常) // 2.正在执行任务的线程不会被终止,直到任务处理完毕; // 3.任务队列中没有线程处理的等待任务,会直接被终止; // 4.返回未执行的任务 executor.shutdownNow(); } } class Task implements Runnable { @Override public void run() { System.out.println("任务被执行!"); } }
第 3 章 相关 Java 基础
3.1 二进制数
在计算机中,所有的数字都是以二进制的形式存在,只用 0 和 1 两个数码来表示。为了区分正数和负数的二进制数,用最高位表示符号位(0 代表正数/1 代表负数)。
以 int 类型的二进制为例;int 类型的十进制整数转换成二进制,最多能够转换成 32 位的二进制数(前 16 位称为高 16 位,后 16 位称为低 16 位);正整数二进制数第 32 位为 0;负整数二进制数第 32 位为 1。
在后文的讲解中,遇到类似这种的 int 二进制数: 0000 0000 0000 0000 0000 0000 0010 1010 ,就直接省略为 0010 1010。
3.2 正整数十进制转二进制
以 42 为例,转化为二进制等于 0010 1010。
3.3 二进制转正整数十进制
以 0001 0100 为例,转换为整数十进制等于 20。
3.4 二进制加法
把二进制数从右对齐,根据“逢二进一”规则计算;二进制数加法的法则:
0+0=0 0+1=1+0=1 1+1=0 (进位为1) 1+1+1=1 (进位为1)
例如:1110 和 1011 相加过程如下:
3.5 二进制减法
把二进制数从右对齐,根据“借一有二”的规则计算;二进制数减法的法则:
0-0=0 1-1=0 1-0=1 0-1=1 (借位为1)
例如:1101 和 1011 相减过程如下:
3.6 原码、反码、补码
以负数 -5 为例:(扩展:正数的二进制反码、补码都是原码本身)
- 1000 0000 0000 0000 0000 0000 0000 0101( -5 的二进制数;这样的编码方式,称为二进制“原码”)
- 1111 1111 1111 1111 1111 1111 1111 1010( -5 的二进制原码符号位不变,其余位取反;这样的编码方式,称为二进制“反码”)
- 1111 1111 1111 1111 1111 1111 1111 1011(-5 的二进制反码 + 1;这样的编码方式,称为二进制“补码”)
3.7 n<<w(有符号左移)
以 5<<2 为例;就是把 5 的二进制位往左移两位,右边补 0 ,得出的十进制结果为 20。(符号位也会跟着左移运算移动)
- 0000 0101(5 的二进制数)
- 0001 0100(5 的二进制数向往左移动两位,右边补 0,得出的十进制结果为 20)
在计算机中,负数都是以二进制补码的形式表示。以 -5<<2 为例;就是把 -5 的二进制补码位往左移两位,右边补 0 ;得出的结果仍是二进制补码的形式(转换成十进制是 -20)。如下:
- 1000 0000 0000 0000 0000 0000 0000 0101( -5 的二进制原码)
- 1111 1111 1111 1111 1111 1111 1111 1010( -5 的二进制原码符号位不变,其余位取反,得到二进制反码)
- 1111 1111 1111 1111 1111 1111 1111 1011( -5 的二进制反码 + 1 ,得到二进制补码)
- ========= -5<<2 运算开始 ==========
- 1111 1111 1111 1111 1111 1111 1110 1100( -5 的二进制补码向左移动两位,右边补 0,得到二进制补码形式的结果)
- ========= -5<<2 运算结束 ==========
- 1111 1111 1111 1111 1111 1111 1110 1011(二进制补码结果 – 1,得到二进制反码)
- 1000 0000 0000 0000 0000 0000 0001 0100(二进制反码符号位不变,其余位再取反,得到二进制原码,转化为十进制是 -20)
3.8 a&b(按位与)
就是将 a 和 b 先转换为二进制数,然后相同位比较,只有两个都为1,结果才为 1,否则为 0。
以 3&5=1 为例:
- 0000 0011(3 的二进制数)
- 0000 0101(5 的二进制数)
- 0000 0001(3&5(011 & 101)的结果,得出的十进制结果是 1)
3.9 a|b(按位或)
就是将 a 和 b 先转换为二进制数,然后相同位比较,只要一个为 1 ,结果即为 1,否则为 0 。
以 6|2=6 为例:
- 0000 0110(6 的二进制数)
- 0000 0010(2 的二进制数)
- 0000 0110( 6|2(110|010)的结果,得出的十进制结果是 6)
3.10 跳出多重循环
代码示例 跳出多重循环
@Test
public void testRetry() {
retry:
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
if (i == 1) {
continue retry;
}
System.out.println("i=" + i + ",j = " + j);
}
}
System.out.println("------分割线------");
retry:
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
if (i == 1) {
break retry;
}
System.out.println("i=" + i + ",j = " + j);
}
}
}
输出如下:
i=0,j = 0
i=0,j = 1
i=0,j = 2
i=2,j = 0
i=2,j = 1
i=2,j = 2
——分割线——
i=0,j = 0
i=0,j = 1
i=0,j = 2
3.11 Thread
void interrupt():在一个线程中调用另一个线程的 interrupt() 方法,会将那个线程设置成线程中断状态,而不会真的中断线程。
boolean isInterrupted(boolean ClearInterrupted):返回当前线程是否处于中断状态,ClearInterrupted 为 true,则返回的同时清除中断状态,反之则保留中断状态。
boolean interrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 true。
boolean interrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 false。
boolean isInterrupted():就是封装了 isInterrupted(boolean ClearInterrupted) 方法,参数为 false。
interrupt() 代码示例
class ThreadTest {
public static void main(String[] args) throws Exception {
MyThread thread = new MyThread();
thread.start();
// 主线程通过调用 thread 对象的 interrupt() 方法,将 thread 线程设置成线程中断状态,但不会真的中断线程。
thread.interrupt();
System.out.println("主线程执行结束!");
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println("test thread!");
}
}
// 打印结果:
// 主线程执行结束!
// test thread!
3.12 LinkedBlockingQueue
void offer(E e):向队尾添加元素,若队列已经满了,则直接返回 false。
E poll(long timeout, TimeUnit unit):移除并返回 BlockingQueue 头部的元素,如果队列为空,则阻塞timeout 个 unit 单位时间;如果规定时间内,仍然没有数据可取,则返回 null 。如果线程被标记为中断状态(对线程调用 interrupt() 方法),则会抛出中断异常。
E take():移除并返回 BlockingQueue 头部的元素,如果队列为空,则阻塞。如果线程被标记为中断状态(对线程调用 interrupt() 方法),则会抛出中断异常。
int drainTo(Collection<? super E> c):从 BlockingQueue 移除所有数据,并放到 c 集合里。
boolean remove(Object o):在 LinkedBlockingQueue 中移除 o。
3.13 原子性
满足以下几个特点,我们就说这个操作支持原子性,线程安全:
- 原子操作中的所有子操作,要不全成功、要不全失败;
- 线程执行原子操作过程中,不会受到其它线程的任何影响;
- 其它线程只能感知到原子操作开始前和结束后的变化。
解释:
包含多个操作单元,但仍支持原子性,通常都是由锁实现的。
代码示例:
class Test { int x = 0; int y = 0; public void test() { // 原子操作 x = 10; // 大致分为两步:1)获取 x 的值到缓存里;2)取出缓存里的值,赋值给 y // 不支持原子性;获取 x 的值到缓存里之后,其它线程可能修改 x 的值,导致 y 值错误 y = x; // 大致分为三步:1)获取 x 的值到缓存里;2)取出缓存里的值加一;3)赋值给 x // 不支持原子性;原理类似 y = x; x++; } }
3.14 Cas
Cas 是 Compare-and-swap(比较并替换)的缩写,是支持原子性的操作;在 Java 中,底层是 native 方法实现,通过 CPU 提供的 lock 信号保证的原子性。
想要将数据 V 的原值 O 替换为新值 N,执行 Cas 操作:
- 预先读取数据 V 的值 O 作为预期值;
-
执行 Cas 操作:
-
比较当前数据 V 的值是否是 O;
- 如果是,则替换为 N,返回执行成功;
- 如果不是,则不替换,返回执行失败;
-
例子:
以 java.util.concurrent.atomic 包中的 AtomicInteger 为例;
public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(100); // 将 AtomicInteger 的值,从 100 替换为 200 Boolean b = atomicInteger.compareAndSet(100, 200); // 返回 true,替换成功 System.out.println(b); }
3.15 volatile
用于修饰变量,可以保证被修饰变量的操作支持可见性和有序性,但不支持原子性。详见”Java 并发面试题集“
第 4 章 源码基础
4.1 导读
1. 工作线程定义
文中的工作线程是指线程池创建的存活线程。
4.2 ThreadPoolExecutor
代码示例 ThreadPoolExecutor 重要成员变量
public class ThreadPoolExecutor extends AbstractExecutorService { /** * 可通过构造函数指定的成员变量 */ // 创建工作线程的工厂类 private volatile ThreadFactory threadFactory; // 核心线程数。用于控制线程池逻辑:当用户提交任务时,只要工作线程数小于 corePoolSize 时,就会创建工作线程执行任务;如果等于 corePoolSize,就不会创建工作线程,而是将任务放到 workQueue 队列对象里。 // 也就是说核心线程只是一个概念,在代码中并没有标记某个线程是否是核心线程;也就是说当工作线程数小于等于 corePoolSize 时,线程池中这部分工作线程就是核心线程。 private volatile int corePoolSize; // 线程存活时间。指线程空闲的情况下,可以存活的时间;核心线程不受 keepAliveTime 控制;除非 allowCoreThreadTimeOut 置为 true private volatile long keepAliveTime; // 是否允许核心线程空闲超时,默认为 false。如果配置 true,核心线程也会受 keepAliveTime 控制 private volatile boolean allowCoreThreadTimeOut; // 当工作线程数等于 corePoolSize 时,提交的任务就会放在 workQueue 指定的队列对象里。 private final BlockingQueue<Runnable> workQueue; // 表示线程池中最多能够容纳多少工作线程数。当工作线程数等于配置的 corePoolSize,并且 workQueue 已满时,线程池会创建新的非核心线程,直到线程池中的工作线程总数等于 maximumPoolSize; private volatile int maximumPoolSize; // 拒绝任务处理器,默认是抛出异常;如果【任务队列已满,并且工作线程总数等于 maximumPoolSize 】或者【已经调用 shutdown() 方法或 shutdownNow() 方法】,向线程池提交任务,则会执行 handler private volatile RejectedExecutionHandler handler; /** * 供内部使用的成员变量 */ // ctl 二进制值的低29位,表示工作线程数 // ctl 二进制值的高3位,表示线程池运行状态 // ctl 是 AtomicInteger 类型,也就是说基于 CAS,支持原子操作, private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // CAPACITY 表示工作线程数容量(十进制值为 536870911;二进制值为 0001 1111 1111 1111 1111 1111 1111 1111),用于参与 ctl 的计算,获取运行状态以及工作线程数;也是比 maximumPoolSize 优先级更高的工作线程数上限 private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // workers 就是存放 worker(工作线程执行者,每个对象持有一个 thread 线程对象,代理了任务的执行) 的容器 private final HashSet<Worker> workers = new HashSet<Worker>(); // largestPoolSize 记录 workers 里保存 worker 对象最多时的数量 private int largestPoolSize; // completedTasks 表示线程池累积处理的任务数量 volatile long completedTasks; /** * 线程池状态 */ // 状态变化: // 运行-> 关闭(调用 shutdown方法)-> 整理-> 终止(RUNING->SHUTDOWN->TIDYING->TERMINATED) // 运行-> 停止(调用 shutdownNow方法)-> 整理-> 终止(RUNING->STOP->TIDYING->TERMINATED) // RUNNING 表示运行状态(初始状态);(十进制值为-536870912;二进制值为 1110 0000 0000 0000 0000 0000 0000 0000) private static final int RUNNING = -1 << COUNT_BITS; // SHUTDOWN 表示关闭状态(十进制值为 0;二进制值为 0000 0000 0000 0000 0000 0000 0000 0000);调用 shutdown() 方法时,会将线程池的状态改为 SHUTDOWN; private static final int SHUTDOWN = 0 << COUNT_BITS; // STOP 表示停止状态(十进制值为 536870912;二进制值为 0010 0000 0000 0000 0000 0000 0000 0000);调用 shutdownNow() 方法时,会将线程池的状态改为STOP; private static final int STOP = 1 << COUNT_BITS; // TIDYING 表示整理状态(十进制值为 1073741824;二进制值为 0100 0000 0000 0000 0000 0000 0000 0000),只有当线程池处于【【SHUTDOWN状态并且 workQueue 队列不存在任务】或者【STOP状态】】时,并且所有工作线程都被销毁,才会将线程池状态改为 TERMINATED ,在改为TERMINATED状态前,会先改为TIDYING,增加这个状态为了区分是否执行 terminated() 方法(该方法默认为空方法,模版模式,用于子类实现扩展),执行 terminated() 方法前为 TIDYING状态,执行后为 TERMINATED 状态 private static final int TIDYING = 2 << COUNT_BITS; // TERMINATED 表示终止状态(十进制值为 1610612736;二进制值为 0110 0000 0000 0000 0000 0000 0000 0000);只有当线程池处于【【SHUTDOWN状态并且 workQueue 队列不存在任务】或者【STOP状态】】,并且所有工作线程都被销毁,并且执行完 terminated() 方法后,线程池才会设置为TERMINATED 状态 private static final int TERMINATED = 3 << COUNT_BITS;
4.3 Worker
ThreadPoolExecutor 内部类 Worker 。顾名思义,就是目标任务的执行者,每个对象里持有一个 thread 线程对象;基于代理者模式,控制目标任务的访问与执行。
代码示例 Worker 重要成员变量
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 持有的工作线程对象(thread 是初始化 Worker 时,通过 ThreadPoolExecutor的 threadFactory 创建出来的,用于执行 firstTask) final Thread thread; // 指定的第一个目标任务;(是初始化 worker 时,指定的;如果为空,则说明可能创建时就未指定或者指定的第一个任务已经处理完了, Worker 就开始从 workQueue 取任务执行) Runnable firstTask; // 当前 Worker 对象处理的任务数量总和 volatile long completedTasks;
<br/>
加入我的粉丝社群,阅读全部内容
从学习到面试,从面试到工作,从 coder 到 TeamLeader,每天给你答疑解惑,还能有第二份收入,这样的知识星球,难道你还要犹豫!
原文
https://segmentfault.com/a/1190000023008886
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » ThreadPoolExecutor 源码解读