早些时候(jdk 1.5之前),并发环境下做同步控制,你的选择不多,多半是使用 synchronized 关键字。不管是同步方法还是同步块,总之遇到这个关键字,未获取锁线程就会乖乖等候,直到已获取锁的线程释放掉锁。
而jdk 1.5推出 ReenntrantLock 之后,此工具一度很风靡,当时人们更喜欢用Lock而不是synchronized,主要是因为它用起来灵活吧。(本人到现在为止,用synchronized的场景还是Lock的时候多)直到后来,越来越多的文章,从性能、是否公平、实现原理各个方面对二者比较,大家才对他们有了更直观的认识。
本文旨在分析ReenntrantLock的主要实现逻辑,并初步窥探AQS结构。如果不犯懒的话,希望后续能将AQS做成系列,真正理解Doug Lea大神的这个经典实现。
研究工具的原理之前,要先会使用工具。
public class ReentrantLockTest {
Lock lock = new ReentrantLock(); //创建锁
public void doSomething(){
//### 1-尝试获取锁,成功
if(lock.tryLock()){
System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName()));
try {
//模拟逻辑执行
TimeUnit.MILLISECONDS.sleep(1100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName()));
lock.unlock(); //### 1.1-逻辑执行完,释放锁
}
//### 2-尝试获取锁,失败
else {
System.out.println(String.format("%s线程,获取锁失败",Thread.currentThread().getName()));
}
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockTest test = new ReentrantLockTest();
int total = 3;
while (total>0){
Thread t = new Thread(()->{
test.doSomething();
},"T-"+total);
t.start();
total--;
TimeUnit.MILLISECONDS.sleep(1000L);
}
}
}
tryLock() 方法会 尝试获取锁,如果获取不到,直接 return false (不会阻断);如果获取到锁, return true 。
上面的例子,执行结果为:
T-3线程,获取到锁了 T-2线程,获取锁失败 T-3线程,业务执行完毕 T-1线程,获取到锁了 T-1线程,业务执行完毕
修改下上例中的加锁方式:
Lock lock = new ReentrantLock();
public void doSomething2(){
lock.lock();
System.out.println(String.format("%s线程,获取到锁了",Thread.currentThread().getName()));
try {
TimeUnit.MILLISECONDS.sleep(1000L); //模拟业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("%s线程,业务执行完毕",Thread.currentThread().getName()));
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
ReentrantLockTest test = new ReentrantLockTest();
int total = 3;
while (total>0){
Thread t = new Thread(()->{
test.doSomething2();
},"T-"+total);
t.start();
total--;
}
}
与tryLock()不通, lock() 方式尝试获取锁,如果获取不到会持续等待 。
执行结果会变为:
T-3线程,获取到锁了 T-3线程,业务执行完毕 T-2线程,获取到锁了 T-2线程,业务执行完毕 T-1线程,获取到锁了 T-1线程,业务执行完毕
ReenntrantLock 加 / 解锁的使用方式就这些,而它是靠编码实现的。下图给出了ReenntrantLock类部分结构:
ReenntrantLock 默认 实现的是 非公平锁 (本文也只分析非公平实现)。
final Sync sync;
public ReentrantLock() {
sync = new NonfairSync(); //成员变量sync,赋值成NonfairSync的对象
}
先从实现较简单的 tryLock() 研究:
## ReentrantLock类
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
↓↓↓↓↓
↓↓↓↓↓
## ReentrantLock.Sync类
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 1- 获取AQS类中的state状态值
if (c == 0) {
// 2- 如果state是0(默认值),将state原子形修改成1
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); // 2.1- 原子修改成功,标记AOS中的exclusiveOwnerThread为当前线程
return true;
}
}
// 3- 此时state不是1,当前线程 == AOS中的exclusiveOwnerThread,将state修改为1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
tryLock()方法, 核心逻辑就是原子修改AQS中的 state 值 , volatile + CAS (jdk9 VarHandle实现)。
具体一些:
实现过程中,只在 首次修改 state 值,即将其从0改成1的时候,采用了原子的 CAS 方式 。
之后 只判断当前线程和 owner线程 (AOS中的exclusiveOwnerThread) 是否一致 , 如果一致state++;不一致,直接 return false 。
unLock()实现同样简单
## ReentrantLock类
public void unlock() {
sync.release(1);
}
↓↓↓↓↓
↓↓↓↓↓
## ReentrantLock.Sync类
public final boolean release(int arg) {
...
tryRelease(arg) //尝试释放
...
}
↓↓↓↓↓
↓↓↓↓↓
## ReentrantLock.Sync类
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // state--
// 1-验证线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //2-如果state==0时,将结果赋值为true,清空owner线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c); //state赋值
return free;
}
如果 操作线程是 owner 线程 (首次tryLock()时会记录owner):
tryLock() 每次调用, state++ ; unLock() 每次调用, state-- (state=0时,清空owner线程)。
Tip: 注释1处,如果当前线程非owner线程,会直接抛出异常!
对于 tryLock() 而言,它在实现上,完全没用到AQS的精华。既然叫Abstract Queued Synchronizer——抽象队列同步器, 队列 、 同步 什么的才是重点。别急, lock() 方法会用到这些。
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //中断interrupt
}
对于默认的非公平锁实现, acquire(int arg) 完全可 替换 成如下写法:
public final void acquire(int arg) {
##### tryAcquire(arg) 改成了 tryLock(arg)
if (!tryLock(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //线程 interrupt
}
如此替换后,逻辑就很好理解了:在用 tryLock() 获取锁失败的情况下,会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
而 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 显然也分成了两个方法 addWaiter 和 acquireQueued
addWaiter(Node.EXCLUSIVE) 部分 : private Node addWaiter(Node mode) {
Node node = new Node(mode); //创建node,创建的同时绑定线程
for (;;) {
Node oldTail = tail;
if (oldTail != null) { //循环2-将node节点和首次循环中初始化的队列关联
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue(); //循环1-初始化同步队列
}
}
}
这里需关注 AQS.Node 类 的一些 关键属性 (已文字标明各属性用途):
## 表示Node节点的状态,有CANCELLED(待取消)、SIGNAL(待唤醒)、CONDITION或默认的0几个状态 volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; volatile Node prev; //prev指向前节点 volatile Node next; //next指向后节点 ## 节点绑定线程 volatile Thread thread;
通过下图,可更清楚的看出 addWaiter方法 的执行过程(此时 线程T-3 在执行中):
addWaiter`会创建队列,并返回尾节点,即图中的`Node2`
acquireQueued(final Node node, int arg) 方法 : final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor(); //获取pre节点,就是Node1
if (p == head && tryAcquire(arg)) { //### 注释1-再次尝试获取锁
setHead(node); //获取到锁了,去掉Node1,Node2变成新的head节点
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
}
...
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) //2次循环,将waitStatus==Node.SIGNAL,renturn true
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL); //首次循环,将pre节点Node1的waitStatus修改成SIGNAL
}
return false;
}
这里依照上图,详细解释下:
acquireQueued 方法的 入参 前面提到了,就是 addWaiter方法 新增的尾节点 ,即入参 node = Node2,那么node.predecessor()自然是Node1了—— p = Node1。
注释1位置,先判断 p 是不是 头结点 :
如果 p 是头节点 (上图中,p就是头结点), tryAcquire(arg) 会再次尝试获取锁。此时也有两种情况:
线程T-3 已经 执行完 并释放了锁,那么当前 线程T-2 可以获取到锁;之后去掉当前头结点Node1,将Node2设置成头结点。 线程T-3 未执行完 ,那么当前 线程T-2 无法获取锁,之后会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法 p 不是头结点 ,同样会执行shouldParkAfterFailedAcquire(Node pred, Node node)方法 而由于 shouldParkAfterFailedAcquire(Node pred, Node node) 方法在循环中,可能会执行两次:
waitStatus 修改成 SIGNAL ( 注意,由于循环的原故,还会再次执行到 注释1 处,也就会再次尝试获取锁 ——上次线程T-3未结束,这次就有可能结束了); waitStatus 已经是 SIGNAL ,直接 return true 。后面的parkAndCheckInterrupt()方法会将当前 线程T-2 阻塞 。 给出 线程T-2 未获取锁情况下的队列情况:
列出 线程T-1 也参与其中的完整队列图。可看到 尾节点之前的节点,绑定的线程都是 阻塞 状态 (park) ,而 waitStatus 都是 待唤醒 状态 (waitStatus = SIGNAL = -1):
总结以上内容,作为 结论2:
acquireQueued`方法,如果当前线程是第1个获取锁失败的线程(例子中“线程T-3”正在执行,“线程T-2”就是第一个获取锁失败的线程),会再尝试2次获取锁; 获取锁失败 或 当前线程非第1个获取锁失败的线程(例子中T-1就不是第一个获取锁失败的线程),将前置节点状态修改成待唤醒,并阻塞关联线程。
为了便于理解,画出整个 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法的逻辑图:
阻塞并非终点,还要再次看下 unlock() 时做了什么。
## ReentrantLock类
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //尝试释放,前面的已经分析过了
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // ### 重点看unparkSuccessor(h)方法,入参是`头节点`
return true;
}
return false;
}
## AQS类
private void unparkSuccessor(Node node) {
// 获取Node节点的waitStatus,如果<0(比如带唤醒SIGNA = -1),原子形还原成0
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
// 获取头结点的下一个节点,如果是空(CANCELLED可能产生空),链表尾部遍历,取最前面一个waitStatus<0的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒
}
先不考虑CANCELLED情况,那么 第二个节点对应的线程 会被唤醒。第二个节点是什么来路?前面已经分析了, 第1个获取锁失败的线程会和第二个节点绑定 (例子中的Node2,对应的线程自然是T-2,下图):
线程T-2 被唤醒后,会做什么?
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) { //循环
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt(); //### 线程T-2原本被阻塞于此
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
很显然,如果 线程T-2 被唤醒后,由于循环的原故,会再次进入如下逻辑:
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node); //head易主
p.next = null;
return interrupted;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
tryAcquire(arg) 再次尝试获取锁,显然此时 线程T-3 已经执行完了(不然也不会执行unlock),那么 线程T-2 很可能会获取到锁——
那么, head易主 ,队列发生如下变化:
最后给出 加 / 解锁过程中的队列变化 ,帮助理解。
以上,终于分析完了 ReentrantLock的主要方法的实现。(有点细碎哈)
本系列的下一篇文章会继续探索 ReentrantLock 的公平锁实现 ,敬请期待!