AQS,即 AbstractQueuedSynchronizer类,是java并发工具类的底层实现基础,例如ReentrantLock、ReentrantReadWriteLock等都是基于AQS实现的,它将未获取到锁的线程封装在一个节点里面,不同的节点通过连接形成了一个 CHL 队列。
CHL 队列:它是一个非阻塞的 FIFO 队列,也就是说在并发条件下往此队列做插入或移除操作不会阻塞,它通过自旋锁和 CAS 保证节点插入和移除的原子性,实现无锁快速插入。
本文从我们平时使用ReentrantLock时用到的API开始,逐步剖析源码,进而理解AQS原理。
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
System.out.println("do work");
} finally {
lock.unlock();
}
}
复制代码
这段代码就是我们平时使用ReentrantLock最常见的流程,创建锁实例 -> 加锁 -> 解锁,下面我们来逐步查看它们的实现。
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
//省略
}
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync{
//省略
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync{
//省略
}
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
//省略
}
复制代码
这里可以看出ReentrantLock有个内部类Sync继承了AbstractQueuedSynchronizer,并且有两个子类NonfairSync和FairSync,从名字可以看出一个是非公平锁,一个是公平锁。ReentrantLock的无参构造函数中对sync属性进行实例化,默认使用的是一个非公平锁的实现。公平锁和非公平锁的实现区别我们后续再说,接下来依照默认的NonfairSync来看锁的实现逻辑。
public class ReentrantLock{
public void lock() {
sync.lock();
}
}
复制代码
调用Reentrant.lock()方法,发现只有一行,调用的是sync.lock(),可以看出sync才是Reentrant的具体实现逻辑所在, 需要再看看NonfairSync.lock()里的实现。后面也会有很多方法是在内部类、子类和父类之间调用,所以为了方便理解,每一块都会把涉及到的相关代码放到一起来看。
public class ReentrantLock implements Lock, java.io.Serializable {
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
}
public abstract class AbstractQueuedSynchronizer{
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
复制代码
state为锁占有状态值,含义如下:
state=0:表示没有线程获取锁;
state>1:表示锁已被线程获取,对于可重入锁,每重入一次 state+1,每释放一次 state-1.
lock方法首先通过CAS将state设置为1,如果设置成功则表示成功获取到锁,设置当前线程为锁占有线程,如果这一步失败,则会走到acquire(1)方法,这个方法是AQS类中的方法:
public abstract class AbstractQueuedSynchronizer {
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
}
复制代码
首先调用tryAcquire尝试获取锁,这个方法是AQS中的抽象方法,由NonfairSync实现,NonfairSync中又去调用父类Sync.nonfairTryAcquire()方法:
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//state为0,则尝试抢占锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//查看当前锁占有线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
nonfairTryAcquire方法首先还是获取state状态,如果为0先尝试设置为1进行抢占,成功则返回true,否则查看占有线程是不是当前线程,如果是,将 state+1并更新,返回true,如果都不是,则说明获取锁失败,返回false。这时会走到 AbstractQueuedSynchronizer.acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法,这个方法的主要作用就是将当前线程挂起,然后生成节点放到同步队列中等待唤醒,我们来看源码实现:
public abstract class AbstractQueuedSynchronizer{
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
/** 共享锁的标志 */
static final Node SHARED = new Node();
/** 独占锁的标志 */
static final Node EXCLUSIVE = null;
/** 表示线程状态已取消 */
static final int CANCELLED = 1;
/** 表示后续线程正在等待释放 */
static final int SIGNAL = -1;
/** 表示线程正在等待condition条件 */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
复制代码
这里首先介绍下AQS内部类Node,既然叫AQS,自然就有Queue,Node就是队列中的元素,从类定义可以看出这是一个双向链表,属性介绍如下:
| 属性 | 说明 |
|---|---|
| waitStatus | 等待状态 |
| prev | 上一节点 |
| next | 下一节点 |
| thread | 当前节点存储的线程 |
| nextWaiter | Node既可以做同步队列节点使用,也可以做等待队列节点使用,前者中的取值为SHARED或EXCLUSIVE,用来标识共享还是独占模式,后者的取值是后继等待节点 |
其中waitStatus有5种取值:
CANCELLED:值为1,取消状态,不再等待
SIGNAL:值为-1,表示后继节点处于等待状态,等待被唤醒
CONDITION:值为-2,当前节点处于等待队列,节点线程等待在Condition上,当其他线程对condition执行signall方法时,等待队列转移到同步队列,加入到对同步状态的获取
PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态
0:以上都不是(源码翻译过来就是这个。。可以理解为初始化状态)
可以看到只有CANCELLED一个状态是大于0的,后续判断状态时会用到这一点。
Node介绍完毕,回到addWaiter方法,这个方法就是将当前线程生成Node节点,并将其添加到队尾,这里有点小技巧,可以看到compareAndSetTail只调用了一次,并没有自旋,难道不怕设置失败吗?原因就在于把自旋放到了enq方法里,可以看到enq方法中,当队列不为空时就会自旋设置tail为当前Node,这块代码其实和addWaite差不多,至于为什么要把这块重复代码放到两个方法里,我暂时也没看明白。。囧
另外可以看到第一次进到enq的for循环中时队列为空,这时会进行初始化,将头结点设置为空节点,因此同步队列中的头节点其实是起到一个头指针的作用。
addWaiter返回了当前Node,然后被acquireQueued调用,首先会判断它的前一节点是不是头结点,是的话说明当前线程是第一个等待线程,会尝试获取锁,
如果获取成功: 则把当前节点设置为头节点,并把原来的头节点设置为null,方便垃圾回收,直接返回 如果获取失败,则调用shouldParkAfterFailedAcquirefang方法判断当前线程是否需要挂起进行等待,如果需要,则调用parkAndCheckInterrupt执行挂起操作,可以看到这里通过&&完成了等同于if else的判断,也算是个小技巧把,这两个方法的执行逻辑都写在下面代码注释中了
public abstract class AbstractQueuedSynchronizer{
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//前置节点状态为SIGNAL,表明当前线程需要挂起等待,返回true
if (ws == Node.SIGNAL)
return true;
//如果前置节点>0,说明是CANCELLED状态,则通过循环将当前节点的前置节点指向到前面第一个状态不为CANCELLED的节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将前置节点状态置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//通过LockSupport类挂起当前线程,后续也需要通过LockSupport.unpark来唤起线程
LockSupport.park(this);
return Thread.interrupted();
}
}
复制代码
可以看到,走到这里获取锁失败的线程就会进入到CHL队列中,当前线程会被挂起,那什么时候会被唤醒呢,我们接下来看看释放锁的过程。
public class ReentrantLock{
public void unlock() {
sync.release(1);
}
}
复制代码
同样,ReentrantLock释放锁时也是调用的sync.release方法,这个方法在父类AbstractQueuedSynchronizer中
public abstract class AbstractQueuedSynchronizer{
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
}
复制代码
可以看到,release方法先执行tryRelease方法,释放成功后则会获取头结点,唤醒头结点的后继节点的线程,唤醒后该线程会继续执行acquireQueued方法中的循环,如果获取锁成功,则将自己置为头结点。
tryRelease方法是个抽象方法,在ReentrantLock.Sync的实现里可以看到,因为是可重入锁,所以每次释放会将state-1,直到state=0时这个锁才是真正释放成功返回true,进行后续唤醒下一个线程操作。
至此,ReentrantLock加锁和释放锁的整个过程就分析完毕,可以看到,其实ReentrantLock中并没有太多实现,只是重写了AQS的tryAcquire和tryRelease两个方法,其他CHL队列的管理、线程的唤醒都是AQS中实现的,所以我们说AQS是其实现的基础,同样ReentrantReadWriteLock、CountdownLatch等也都是都是基于AQS实现的,后续会再抽时间对这些类的源码进行分析。
本文开头还有说到ReentrantLock中还有NonfairSync和FairSync两个实现,它们的区别主要在于tryAcquire的实现上
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
public abstract class AbstractQueuedSynchronizer{
//判断是否已有节点正在等待,如果head==tail,说明队列为空,返回false,如果队列不为空,且head后继节点不是当前线程节点,说明已有其他线程正在等待,返回true
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
}
复制代码
NonfairSync.lock和NonfairSync.tryAcquire方法中,新的线程进来后都会直接尝试获取锁,因此可能会出现这种情况:某个线程A终于被唤醒后,这时一个新的线程刚进来,并成功获取到锁,导致线程A执行tryAcquire方法失败,又只能继续等待,即线程饥饿的情况。
FairSync.tryAcquire方法则不同,如果锁没有被获取,首先会通过hasQueuedPredecessors方法判断是否已经有其他线程在等待(hasQueuedPredecessors也是在AQS中实现的,AQS真是好人啊。。),如果有就返回false,进入后续enq等操作,加入到CHL队列中进行等待。
可以看到,非公平锁可以减少CPU唤醒线程的开销,因此整体的吞吐效率会高点。
以上就是本文全部内容,可能有的地方理解有误,欢迎各位指出~
参考: blog.csdn.net/qq_28605513…