转载

从ReentrantLock源码深入理解AQS

AQS,即 AbstractQueuedSynchronizer类,是java并发工具类的底层实现基础,例如ReentrantLock、ReentrantReadWriteLock等都是基于AQS实现的,它将未获取到锁的线程封装在一个节点里面,不同的节点通过连接形成了一个 CHL 队列。

CHL 队列:它是一个非阻塞的 FIFO 队列,也就是说在并发条件下往此队列做插入或移除操作不会阻塞,它通过自旋锁和 CAS 保证节点插入和移除的原子性,实现无锁快速插入。

本文从我们平时使用ReentrantLock时用到的API开始,逐步剖析源码,进而理解AQS原理。

ReentrantLock使用

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        try {
            System.out.println("do work");
        } finally {
            lock.unlock();
        }
    }
复制代码

这段代码就是我们平时使用ReentrantLock最常见的流程,创建锁实例 -> 加锁 -> 解锁,下面我们来逐步查看它们的实现。

创建锁实例

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //省略
    }
    
    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync{
        //省略
    }
    
    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync{
        //省略
    }
    
    /**
     * Creates an instance of {@code ReentrantLock}.
     * This is equivalent to using {@code ReentrantLock(false)}.
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    //省略
}
复制代码

这里可以看出ReentrantLock有个内部类Sync继承了AbstractQueuedSynchronizer,并且有两个子类NonfairSync和FairSync,从名字可以看出一个是非公平锁,一个是公平锁。ReentrantLock的无参构造函数中对sync属性进行实例化,默认使用的是一个非公平锁的实现。公平锁和非公平锁的实现区别我们后续再说,接下来依照默认的NonfairSync来看锁的实现逻辑。

加锁

public class ReentrantLock{
    public void lock() {
        sync.lock();
    }
}
    
复制代码

调用Reentrant.lock()方法,发现只有一行,调用的是sync.lock(),可以看出sync才是Reentrant的具体实现逻辑所在, 需要再看看NonfairSync.lock()里的实现。后面也会有很多方法是在内部类、子类和父类之间调用,所以为了方便理解,每一块都会把涉及到的相关代码放到一起来看。

public class ReentrantLock implements Lock, java.io.Serializable {

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
}

public abstract class AbstractQueuedSynchronizer{

    private volatile int state;

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
}
复制代码

state为锁占有状态值,含义如下:

state=0:表示没有线程获取锁;

state>1:表示锁已被线程获取,对于可重入锁,每重入一次 state+1,每释放一次 state-1.

lock方法首先通过CAS将state设置为1,如果设置成功则表示成功获取到锁,设置当前线程为锁占有线程,如果这一步失败,则会走到acquire(1)方法,这个方法是AQS类中的方法:

public abstract class AbstractQueuedSynchronizer {
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
}


复制代码

首先调用tryAcquire尝试获取锁,这个方法是AQS中的抽象方法,由NonfairSync实现,NonfairSync中又去调用父类Sync.nonfairTryAcquire()方法:

abstract static class Sync extends AbstractQueuedSynchronizer {
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //state为0,则尝试抢占锁
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //查看当前锁占有线程是不是当前线程
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
复制代码

nonfairTryAcquire方法首先还是获取state状态,如果为0先尝试设置为1进行抢占,成功则返回true,否则查看占有线程是不是当前线程,如果是,将 state+1并更新,返回true,如果都不是,则说明获取锁失败,返回false。这时会走到 AbstractQueuedSynchronizer.acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,这个方法的主要作用就是将当前线程挂起,然后生成节点放到同步队列中等待唤醒,我们来看源码实现:

public abstract class AbstractQueuedSynchronizer{
    
    private transient volatile Node head;

    private transient volatile Node tail;

    static final class Node {
        /** 共享锁的标志 */
        static final Node SHARED = new Node();
        /** 独占锁的标志 */
        static final Node EXCLUSIVE = null;

        /** 表示线程状态已取消 */
        static final int CANCELLED =  1;
        /** 表示后续线程正在等待释放 */
        static final int SIGNAL    = -1;
        /** 表示线程正在等待condition条件 */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        
        volatile int waitStatus;
        
        volatile Node prev;
        
        volatile Node next;
        
        volatile Thread thread;
        
        Node nextWaiter;
    }
    
     private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
}
复制代码

这里首先介绍下AQS内部类Node,既然叫AQS,自然就有Queue,Node就是队列中的元素,从类定义可以看出这是一个双向链表,属性介绍如下:

属性 说明
waitStatus 等待状态
prev 上一节点
next 下一节点
thread 当前节点存储的线程
nextWaiter Node既可以做同步队列节点使用,也可以做等待队列节点使用,前者中的取值为SHARED或EXCLUSIVE,用来标识共享还是独占模式,后者的取值是后继等待节点

其中waitStatus有5种取值:

CANCELLED:值为1,取消状态,不再等待

SIGNAL:值为-1,表示后继节点处于等待状态,等待被唤醒

CONDITION:值为-2,当前节点处于等待队列,节点线程等待在Condition上,当其他线程对condition执行signall方法时,等待队列转移到同步队列,加入到对同步状态的获取

PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态

0:以上都不是(源码翻译过来就是这个。。可以理解为初始化状态)

可以看到只有CANCELLED一个状态是大于0的,后续判断状态时会用到这一点。

Node介绍完毕,回到addWaiter方法,这个方法就是将当前线程生成Node节点,并将其添加到队尾,这里有点小技巧,可以看到compareAndSetTail只调用了一次,并没有自旋,难道不怕设置失败吗?原因就在于把自旋放到了enq方法里,可以看到enq方法中,当队列不为空时就会自旋设置tail为当前Node,这块代码其实和addWaite差不多,至于为什么要把这块重复代码放到两个方法里,我暂时也没看明白。。囧

另外可以看到第一次进到enq的for循环中时队列为空,这时会进行初始化,将头结点设置为空节点,因此同步队列中的头节点其实是起到一个头指针的作用。

addWaiter返回了当前Node,然后被acquireQueued调用,首先会判断它的前一节点是不是头结点,是的话说明当前线程是第一个等待线程,会尝试获取锁,

如果获取成功: 则把当前节点设置为头节点,并把原来的头节点设置为null,方便垃圾回收,直接返回 如果获取失败,则调用shouldParkAfterFailedAcquirefang方法判断当前线程是否需要挂起进行等待,如果需要,则调用parkAndCheckInterrupt执行挂起操作,可以看到这里通过&&完成了等同于if else的判断,也算是个小技巧把,这两个方法的执行逻辑都写在下面代码注释中了

public abstract class AbstractQueuedSynchronizer{

   private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //前置节点状态为SIGNAL,表明当前线程需要挂起等待,返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果前置节点>0,说明是CANCELLED状态,则通过循环将当前节点的前置节点指向到前面第一个状态不为CANCELLED的节点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //将前置节点状态置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        //通过LockSupport类挂起当前线程,后续也需要通过LockSupport.unpark来唤起线程
        LockSupport.park(this);
        return Thread.interrupted();
    }
}
复制代码

可以看到,走到这里获取锁失败的线程就会进入到CHL队列中,当前线程会被挂起,那什么时候会被唤醒呢,我们接下来看看释放锁的过程。

释放锁

public class ReentrantLock{
    public void unlock() {
        sync.release(1);
    }
}
    
复制代码

同样,ReentrantLock释放锁时也是调用的sync.release方法,这个方法在父类AbstractQueuedSynchronizer中

public abstract class AbstractQueuedSynchronizer{
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

      
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
    
}
复制代码

可以看到,release方法先执行tryRelease方法,释放成功后则会获取头结点,唤醒头结点的后继节点的线程,唤醒后该线程会继续执行acquireQueued方法中的循环,如果获取锁成功,则将自己置为头结点。

tryRelease方法是个抽象方法,在ReentrantLock.Sync的实现里可以看到,因为是可重入锁,所以每次释放会将state-1,直到state=0时这个锁才是真正释放成功返回true,进行后续唤醒下一个线程操作。

至此,ReentrantLock加锁和释放锁的整个过程就分析完毕,可以看到,其实ReentrantLock中并没有太多实现,只是重写了AQS的tryAcquire和tryRelease两个方法,其他CHL队列的管理、线程的唤醒都是AQS中实现的,所以我们说AQS是其实现的基础,同样ReentrantReadWriteLock、CountdownLatch等也都是都是基于AQS实现的,后续会再抽时间对这些类的源码进行分析。

本文开头还有说到ReentrantLock中还有NonfairSync和FairSync两个实现,它们的区别主要在于tryAcquire的实现上

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;

    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
     static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}

public abstract class AbstractQueuedSynchronizer{

    //判断是否已有节点正在等待,如果head==tail,说明队列为空,返回false,如果队列不为空,且head后继节点不是当前线程节点,说明已有其他线程正在等待,返回true  
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
}
复制代码

NonfairSync.lock和NonfairSync.tryAcquire方法中,新的线程进来后都会直接尝试获取锁,因此可能会出现这种情况:某个线程A终于被唤醒后,这时一个新的线程刚进来,并成功获取到锁,导致线程A执行tryAcquire方法失败,又只能继续等待,即线程饥饿的情况。

FairSync.tryAcquire方法则不同,如果锁没有被获取,首先会通过hasQueuedPredecessors方法判断是否已经有其他线程在等待(hasQueuedPredecessors也是在AQS中实现的,AQS真是好人啊。。),如果有就返回false,进入后续enq等操作,加入到CHL队列中进行等待。

可以看到,非公平锁可以减少CPU唤醒线程的开销,因此整体的吞吐效率会高点。

以上就是本文全部内容,可能有的地方理解有误,欢迎各位指出~

参考: blog.csdn.net/qq_28605513…

原文  https://juejin.im/post/5eb3dfe36fb9a0434d70a367
正文到此结束
Loading...