AbstractQueuedSynchronizer 是大师 Doug Lea 编写的一个并发编程类,位于 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的组成部分,他们中关于 “锁” 的部分与 AQS 息息相关。
借用一下源码中的说法, AbstractQueuedSynchronizer 基于一个 FIFO 队列 提供了一套阻塞锁和同步相关的实现。该类被设计成为很多同步容器 synchronizers 的底层实现,它使用了一个原子int private volatile int state; 来表示当前状态。当在 AQS 被 acquired (获取资源) 或被 release (释放资源)时,需要依据这个 state 来进行判断。所以子类需要定义方法来修改这个状态,该状态的含义由我们自由定制。(翻译的不好...)
我们来看一个最简单的例子,我们有一个类 Sync 继承了 AbstractQueuedSynchronizer ,并重写了其 tryAcquire 和 tryRelease 方法。实现非常简单,我们通过调用父类的 compareAndSetState() 以及 setState() 来完成, 简单来说(不是特别准确) ,就是 tryAcquire 返回 true ,代表获取锁成功,否则就会阻塞。而 tryRelease 则负责锁的释放。
在例子中:将 state 设置为 100 代表当前状态为无锁, 1 则代表已经有某个线程获取了该锁。当然这个 state 表达的含义是怎么样的,完全是我们定义的,实际上锁定或者无锁是 100 还是 200 还是 -100 ,都没有什么关系。
/**
* Created by Anur IjuoKaruKas on 2019/5/7
*/
public class Mutex extends AbstractQueuedSynchronizer {
public static class Sync extends AbstractQueuedSynchronizer {
public Sync() {
setState(100); // set the initial state, being unlocked.
}
@Override
protected boolean tryAcquire(int ignore) {
boolean result = compareAndSetState(100, 1);
print("尝试获取锁" + (result ? "成功" : "失败"));
return result;
}
@Override
protected boolean tryRelease(int ignore) {
setState(100);
return true;
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(0);
}
public void unLock() {
sync.release(0);
}
public static void main(String[] args) throws InterruptedException {
Mutex mutex = new Mutex();
mutex.lock();
Thread thread = new Thread(() -> {
print("调用 mutex.lock() 之前");
mutex.lock();
print("调用 mutex.lock() 之后");
});
thread.start();
print("main 线程 Sleep 之前");
Thread.sleep(5000);
print("main 线程 Sleep 之后");
mutex.unLock();
}
public static void print(String print) {
System.out.println(String.format("时间 - %s/t/t%s/t/t%s", new Date(), Thread.currentThread(), print));
}
}
========================================= 输出
时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] 尝试获取锁成功
时间 - Fri May 24 15:44:19 CST 2019 Thread[main,5,main] main 线程 Sleep 之前
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 15:44:19 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 15:44:24 CST 2019 Thread[main,5,main] main 线程 Sleep 之后
时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功
时间 - Fri May 24 15:44:24 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
我们可以看到,代码符合我们的预期:在 main 函数所在线程调用 mutex.unLock(); 释放锁之前,子线程是一直阻塞的, 调用 mutex.lock() 之后 的日志输出发生在 main 线程 Sleep 之后 之后。
通过重写 tryAcquire 、 tryRelease 方法,以及调用 acquire 和 release 方法,我们很容易就实现了一个锁,当然这个锁有一堆问题... 我们只是通过这个小例子,来建立对 AQS 一个简单的了解。
看到这里,有些细心的小伙伴可能会想了,既然锁是由 tryAcquire 控制的,那和 state 又有什么关系呢? 我们完全可以定义一个自定义变量,比如 sign , false 代表无锁, true 代表锁定,好像也可以实现这段逻辑啊?这个时候就需要引出我们神奇的 compareAndSet , CAS 操作了。
前面说到,我们暂时认为 : tryAcquire 返回 true ,代表获取到锁,反之只要 tryAcquire 返回 flase ,线程就会被阻塞 (不准确,后面会细说)。实际上这里有一个 隐含条件,我们必须做到:
tryAcquire 成功,且在某个线程 tryAcquire 成功之后,并在其 release 释放锁之前,任何线程进行 tryAcquire 都将返回 false 。 下面这个例子我们简单使用一个自定义变量 sign 来实现 tryAcquire ,看看会发生什么:
private boolean sign;
@Override
protected boolean tryAcquire(int ignore) {
boolean result = false;
if (!sign) {
sign = true;
result = true;
}
print("尝试获取锁" + (result ? "成功" : "失败"));
return result;
}
@Override
protected boolean tryRelease(int ignore) {
sign = false;
return true;
}
========================================= 输出
时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] 尝试获取锁成功
时间 - Fri May 24 18:03:12 CST 2019 Thread[main,5,main] main 线程 Sleep 之前
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之前
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 18:03:12 CST 2019 Thread[Thread-0,5,main] 尝试获取锁失败
时间 - Fri May 24 18:03:17 CST 2019 Thread[main,5,main] main 线程 Sleep 之后
时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 尝试获取锁成功
时间 - Fri May 24 18:03:17 CST 2019 Thread[Thread-0,5,main] 调用 mutex.lock() 之后
看起来好像没问题,在这个 demo 中也得到了和第一个 DEMO 一样的预期的结果。然而事情并没有那么简单,新写的这个 tryAcquire 实现是一个 "CompareThenSet" 操作,在并发的情况下,会出现不可预期的情况
sign 为 false sign 为 false sign 修改为 true ,问题就来了。 我们改一下 Main 方法,我们使用 100 个线程并发执行 mutex.lock(); 获取锁成功则会输出语句 print("获取锁成功"); ,执行,发现,竟然有两个线程同时获取到了锁。 有两个线程同时将 sign 修改为了 true 。
public static void main(String[] args) throws InterruptedException {
Mutex mutex = new Mutex();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
threads.add(new Thread(() -> {
mutex.lock();
print("获取锁成功");
}));
}
ExecutorService executorService = Executors.newFixedThreadPool(100);
threads.forEach(executorService::submit);
Thread.sleep(1000);
}
如果我们使用 AQS 帮我们写好的 compareAndSetState 则没有这个问题。
在 Java9 之前,底层实现是调用 unsafe 包的 compareAndSwapInt 来实现的:
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
而在 Java9 之后,则是使用 VarHandle 来实现 VarHandle 是 unSafe 的一个替代方案,本文不多赘述,后面会有文章讲到这个 ~ 。
// VarHandle mechanics
private static final VarHandle STATE;
---------------------------------------------
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}
这里简单的说一下 CAS ,即 CompareAndSwap 。 CAS 可原子性地比较并替换一个值,乐观锁中一个典型的实现便是使用 CAS 来完成的。对并发编程有所了解的小伙伴应该都知道 CAS ,一般情况下, Compare(比较) 和 Swap(交换) 至少是两个原子操作(实际上是更多个原子操作,主要看编译成多少条机器码)。 而 CAS 则保证了 Compare 和 Swap 为一个原子操作。
上文说到, 我们暂时认为 : tryAcquire 返回 true ,代表获取到锁,反之只要 tryAcquire 返回 flase ,线程就会被阻塞。
AQS 当然没有这么简单,但我们可以先看看加锁时调用的 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们发现, tryAcquire 只是第一重判断,如果 tryAcquire 失败,紧接着还有另一个 核心逻辑 acquireQueued 。在简介里,我们说, AQS 除了使用一个 原子state 来作为状态判断以外,还有一个 FIFO 队列 ,此队列就和 acquireQueued 方法息息相关。另外, AQS 所控制的资源访问,还可以是共享的,或者独占的( addWaiter 参数 Node.EXCLUSIVE )。
以下的分析我们以一个简单的 独占式非公平 AQS 实现: java.util.concurrent.locks.ReentrantLock.NonfairSync 来深入解析。独占式很好理解,大部分的锁实现都只允许一个线程在同一时间获取到锁定的资源。
先看看 NonfairSync 的 tryAcuire 是怎么实现及优化的。首先, NonfairSync 中将 state == 0 定义为无锁状态。
state == 0 ),再调用 CAS 。这实际上对性能是一个很好的优化,假设当前取 state 不为 0 ,实际上 CompareAndSetState 成功的概率也很小,这也可以避免同一时间内,过多的线程去并发修改 state 这个状态。 CAS 操作,会发生什么?毫无疑问是 CAS 失败,这会间接导致死锁。这里我们可以看到,重入以后,有一个 int nextc = c + acquires; 操作,这是方便我们记录到底套了几层锁用的,如果没有这个机制,我们将 无法精确的控制加锁和解锁的层级 ,难免会出现一些意料之外的情况。简单来说: lock 几次,就要 unLock 几次。当然我们也可以做到 aquire 多次,一次性 release 掉,或者反过来,取决于怎么我们实现 tryAquire 和 tryRlease 方法。 CAS 操作,返回 true 即可。 @ReservedStackAccess
final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 避免过多的线程竞争 CAS 操作
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);// 如果 CAS 操作成功,则将当前线程保存起来,重入和解锁时用于判断。
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 重入优化,每次加锁相当于 `state++`
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true; // 偏向优化
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 每次解锁相当于 `state--` 直到 state == 0 ,代表可释放锁了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
①、addWaiter阶段
如果 tryAquire 失败,就会进入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) , addWaiter 方法创建了一个新的 Node 实例, Node 实例中主要保存了当前线程信息,并将 nextWaiter 赋值为 Node.EXCLUSIVE , 这个 nextWaiter 后面再谈,它主要用于线程调度、以及独占模式、共享模式的区分,我们可以先不管它。
操作比较简单,原理是将 node 塞入双向链表尾端,也就是前面提到的 FIFO队列 。就是利用 CAS 操作将新创建的、带有本线程信息的 node 设置为双向链表新的 tail ,并且修改两者的 ‘指针’ prev 和 next 。
/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue(); // 初始化双向链表,就是创建一个新的空 node,并且头尾都是此 node。
// 这个 node 除了拿来标记链表从哪里开始,没有什么别的意义。
}
}
②、acquireQueued阶段(自旋)
入队成功后,进入 acquireQueued 方法, 抛开线程被 interrupt 的情况 , acquireQueued 的代码其实也很简单,我们不看 interrupt 相关逻辑,其实逻辑还是很简单的。这是一个 无限循环(或者说自旋) ,只要没有 tryAcquire 成功,就会一直循环下去,逻辑如下:
FIFO 队列头,则进行一次 tryAquire ,如果成功,则跳出循环。 parkAndCheckInterrupt 便是阻塞直到被唤醒(或者被 interrupt ,暂时先不考虑这个情况)。 final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquire() 和 parkAndCheckInterrupt() 都很好理解,前者就是去尝试一下获取锁定资源,看能否成功。后者则是阻塞直到被唤醒。
③、阻塞阶段
我们先说说 shouldParkAfterFailedAcquire ,这个判断是一个挺有意思的设计,后续文章会细说,它和线程调度、取消获取锁等相关。因为在获取锁定资源和释放锁定资源的过程中,实际上我们只需要用到两个状态,一个是初始状态 pred.waitStatus == 0 ,另一个是 pred.waitStatus == SIGNAL == -1 。
代码中我们可以很容易看出,在 CAS 将 prev 节点的 waitStatus 设置为 SIGNAL : -1 之前,都将返回 false ,如果设置成功,下一次自旋进入该方法就是 true 了,也就是说,会进入 parkAndCheckInterrupt() 方法,阻塞直到被唤醒。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
自旋阶段图解:
阻塞阶段图解:
①、唤醒 FIFO 的下一个节点
阻塞直到唤醒这个逻辑在锁定资源、释放资源 这两个阶段来看十分简单,最后我们来看看 release 做了什么, release 除了调用了我们自己实现的 tryRelease 之外,其实关键的就是这个 unparkSuccessor 。
tryRelease 上面也说过了,就是改改原子 state ,这里不多赘述。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
代码中可以看出,当 FIFO 队列不为空且头结点的 waitStatu 被修改过,就会进入 unparkSuccessor , unparkSuccessor 传入了当前 FIFO 的队列头,逻辑如下:
waitStatus 为负(可能为 SIGNAL 、 CONDITION 或者 PROPAGATE ),我们这里简单先看成只有 SIGNAL 状态,则 CAS 将其设置为 0 。其他几个状态我们后面会说到。 !(s == null || s.waitStatus > 0) ,也就是说 node.next 的 waitStatus <= 0 ,则简单的直接将其唤醒: LockSupport.unpark(s.thread); private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
②、被唤醒后
被唤醒的线程当然不是直接获得了锁,它还是会继续 acquireQueue 进行自旋,逻辑还是和之前一样,避免小伙伴往上翻代码,这里贴了一份如果 prev 是头结点,如果 tryAcquire 成功,我们看到其实很简单,只是将自己设为头部即可。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
这篇文章只是简单的说说 AQS 的正向获取资源,释放资源流程,后续会继续解析 wait 、 notify 、 condition 等基于 AQS 的线程调度解析 ~~ 以及各个锁是如何实现 AQS 的 ~~
文章皆是基于源码一步步分析,没有参考过多资料,如有错误, 请指出!!
另外欢迎来 Q 群讨论技术相关(目前基本没人) [左二维码] ~
如果觉得写得好还可以关注一波订阅号哟 ~ 博客和订阅号同步更新 [右二维码] ~
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example
另外小伙伴可以思考一下:
ThreadB 被唤醒后,继续自旋时,另一个线程 ThreadC tryAcquire 成功了会发生什么。 CAS 操作,都有 ABA 问题,如果说修改 waitStatus 发生了 ABA 问题,会发生什么?