转载

学习AbstractQueuedSynchronizer源码以及原理

AbstractQueuedSynchronizer(以下简称AQS),是用来构建锁或者其他同步组件的基础架构,它使用一个int成员变量管理同步状态,通过内置FIFO队列来完成资源获取获取线程的排队工作,作者(Doug lea)大神期望它能完成大部分同步需求的基础。

AQS的主要使用方法是继承,子类通过继承AQS并实现它的模板方法来管理同步状态,AQS通过(getState、setState(int newState)、compareAndSetState(int expect,int update)三个方法来管理同步状态,这三个方法是线程安全的。AQS推荐将AQS定义为子类的内部静态类,因为AQS自身没有任何同步接口。它仅仅管理了若干同步状态的获取和释放,AQS既支持独占所也支持共享锁,这样就可以向作者想的那样实现大部分同步需求。

AQS是面向锁的实现,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒的底层操作。降低了开发者的实现锁的成本。

AQS队列以及节点分析

AQS内部依赖一个FIFO双向链表来管理同步状态,当当前线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构造成一个节点(Node)加入到链表中。同事阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次舱室获取同步状态。

Node用来保存获取同步状态失败的线程引用以及其他一些属性,结果属性以及描述如下表所示:

static final class Node {
    
    // 同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态后状态将不会发生变化
    static final int CANCELLED =  1;
    // 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
    static final int SIGNAL    = -1;
    // 表示当前节点在等待condition,也就是在condition队列中
    static final int CONDITION = -2;
    // 该状态表示下一次共享式同步状态获取将会无条件的传递下去
    static final int PROPAGATE = -3;

    // 等待状态 用上述的4个值以及初始值0代表同步队列中的状态
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 获取同步状态的线程
    volatile Thread thread;
    // 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量
    Node nextWaiter;
    
}
复制代码

独享锁示例以及源码分析

class Mutex implements Lock, java.io.Serializable {

    private static class Sync extends AbstractQueuedSynchronizer {

        protected boolean isHeldExclusively() {
            return getState() == 1;
        }


        public boolean tryAcquire(int acquires) {
            assert acquires == 1;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }


        protected boolean tryRelease(int releases) {
            assert releases == 1;
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }


        Condition newCondition() {
            return new ConditionObject();
        }

    }


    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

}

复制代码

上述代码是用AQS实现的最简单的独享锁,调用该锁的方式:

Lock lock = new Mutex();
try{
    lock.lock();
}finally {
    lock.unlock();
}
复制代码

首先我们回调用AQS的acquire方法:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
复制代码

上述代码首先调用我们自定义的tryAcquire(int arg)方法如果同步状态获取失败,则构造独占式同步节点并通过addWaiter(Node node)方法将该节点加入到同步队列尾部,最后调用acquireQueued(Node node,int arg)方法,使该节点以自旋的方法获取同步状态。如果获取不到阻塞节点中的线程,而被阻塞的线程的唤醒主要依靠前驱节点的出队以及阻塞队列被中断来实现。接下来看addWaiter和加入同步队列enq方法:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //已初始化会通过下列方法尝试一次加入node到队尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
// 未初始化同步队列或者上面快速添加节点失败会通过这个方法将node加入到队尾
private Node enq(final Node node) {
    // 自旋保证加入到队列尾部
    for (;;) {
        Node t = tail;
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
复制代码

上述代码通过compareAndSetHead和compareAndSetTail两个方法确保线程安全的初始化同步队列以及添加一个节点到同步队列尾部。节点加入到同步队列以后每个节点都会进入自旋状态,更新自己的waitStatus:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是头结点尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; 
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
复制代码

上述逻辑主要包括:

  • 获取当前节点的前驱节点; 需要获取当前节点的前驱节点,而头结点所对应的含义是当前站有锁且正在运行。
  • 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁; 如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
  • 否则进入等待状态。 如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。

上述做法不会过早的唤醒睡眠中的线程,降低资源使用率,提高性能。

最后我们回调用release(int arg)方法释放当前线程的同步状态,使后继节点能够继续获取同步状态。代码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
复制代码

上述逻辑会调用unparkSuccessor(Node node)唤醒处于等待状态的线程:

private void unparkSuccessor(Node node) {
    // 将状态设置为同步状态
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 获取当前节点的后继节点,如果满足状态,那么进行唤醒操作  
    // 如果没有满足状态,从尾部开始找寻符合要求的节点并将其唤醒
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
复制代码

分析至此总结一下AQS实现独占锁的流程:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会加入到队列中并进行自旋,移除队列以及停止自旋的条件是前驱节点是头结点且成功获取了同步了状态,最后通过自定义tryRelease(int arg)方法释放同步状态,唤醒后继节点。

原文  https://juejin.im/post/5c1663f75188254eb05fa659
正文到此结束
Loading...