转载

啃透Java并发-AQS详解

上一篇我们解读了LockSupport的源码,其中有提到JUC的一块重要基石是AbstractQueuedSynchronizer类,简称AQS,那么这一篇就正式学习这个类。由于我也是以学代练,肯定有很多地方理解的不够到位,欢迎大家留言讨论哈!还是友情提示,本文的分析的JDK版本是8。

AQS的工作原理概述

为什么要在开篇就介绍AQS的工作原理呢?因为先对一些知识点有个大概了解,可以帮我们在看源码时更容易理解一些,做到有的放矢,事半功倍。

这里我总结了三个比较关键的点,需要我们知道的。

  1. AQS内部有一个volatile变量state,并且提供了compareAndSetState方法,可以线程安全的修改state的值,不同的需求场景下,state会有不同的意义,粗俗一点说,就是游戏规则我们自己根据需求来定义,只要大家都遵守这个规则,这个游戏就能够玩起来。
  2. 当竞争锁失败后,其实可以理解为CAS更新state失败,当前线程会被封装进一个Node对象,然后放入一个Node双向队列中,然后调用LockSupport.park,让线程等待。
  3. 当锁被释放后,AQS会检查队列中是否有线程在等待,如果有,unpark唤醒该线程,并从等待队列中删除对应的Node(将该node设为头结点)。

AQS的实现思路还是很清晰的,使用一个state来维护竞争状态,使用CAS来安全的更新state,获取锁失败的线程进入等待队列unpark,锁被释放后,从队列中唤醒一个线程来继续尝试获取锁。

使用AQS自定义一个Lock类

AQS支持独占和共享二种模式,独占模式相对容易理解一些,光说不练假把式,我们先利用AQS实现一个独占锁SmartLock来加深理解。

public class SmartLock {

    private class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean tryAcquire(int arg) {
            if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                return false;
            }
        }

        @Override
        protected boolean tryRelease(int arg) {
            setState(0);
            setExclusiveOwnerThread(null);
            return true;
         }
    }

    private Sync mSync;

    public SmartLock() {
        mSync = new Sync();
    }

    public void lock() {
        mSync.acquire(1);
    }

    public void unLock() {
        mSync.release(1);
    }
}
复制代码

我们新建一个内部类Sync继承AQS,重写它的tryAcquire和tryRelease方法,可以理解为它们分别对应独占模式下的尝试获取锁和尝试释放锁,返回true表示成功,false表示失败。

这里我们可以停下来想一下,既然AQS内部有一个state可以利用,那我们可以这样设定游戏规则,state=1时表示锁被占用,state=0表示锁没有被某个线程持有。

protected boolean tryAcquire(int arg) {
            // 先判断当前持有锁的线程是不是本线程,如果是,直接返回true,所以我们这个锁是支持可冲入的
            if (getExclusiveOwnerThread() == Thread.currentThread()) return true;
            // CAS的方式更新state,只有当state=0时会成功更新为1
            if (compareAndSetState(0, 1)) {
                // 当前线程已经获取了锁,设置为Owner thread
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                // 返回true,当前线程会被加入等待队列中
                return false;
            }
        }
        
        protected boolean tryRelease(int arg) {
            // 状态更新为0,
            setState(0);
            // Owner thread设置为null
            setExclusiveOwnerThread(null);
            return true;
        }
复制代码

我们在SmartLock类中定义二个方法lock和unLock,分别调用acquire和release即可,这里的参数没有用到,传1即可。

public void lock() {
        mSync.acquire(1);
    }

    public void unLock() {
        mSync.release(1);
    }
复制代码

我们用SmartLock来实现一个线程安全的累加器,逻辑很简单就是提供一个increase方法,对counter进行++操作,我们知道++操作不是原子的,所以我们用SmartLock来保证原子性。

public class SmartAdder {
    private volatile int counter;
    private SmartLock lock;

    public SmartAdder() {
        lock = new SmartLock();
    }

    public void increase() {
        lock.lock();
        try {
            counter++;
        } finally {
            lock.unLock();
        }
    }

    public int getCounter() {
        return this.counter;
    }
}
复制代码

我们写一段测试的case来验证一下,新建了一个固定有20个核心线程的线程池,然后提交了40个累加任务,每个任务循环100000次,这样得到的正确结果应该是4000000。

public static void main(String[] args) {
        int threadCount = 20;
        int addCount = 100000;
        SmartAdder smartAdder = new SmartAdder();
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < threadCount * 2; i++) {
            executorService.submit(() -> {
                for (int j = 0; j < addCount; j++) {
                    smartAdder.increase();
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("count:" + smartAdder.getCounter());
        executorService.shutdown();
    }
    
    // 打印结果
    count:4000000
复制代码

打印就结果验证了我们的SmartLock确实能够正常工作,这样一个简单的互斥锁就完成了,其实也并不复杂嘛!其中CountDonwLatch也是JUC提供的一个并发同步类,关于它的用法后面会详解,这里大家只需要知道await可以让当前线程等待线程池中的任务执行完成即可。

AQS源码解读

有了前面的铺垫,我们现在先看下AQS中独占模式的acquire和release二个方法的具体实现。

独占模式acquire方法源码解读

先看acquire方法

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
复制代码

可以看到acquire是一个final方法,我们没法重写它,但是有预留一个tryAcquire方法让我们重写,我们在上面的SmartLock类中也是重写了tryAcquire该方法,如果tryAcquire返回false,会调用acquireQueued方法,它的参数是addWaiter(Node.EXCLUSIVE)的结果,我们先来具体跟进看一下addWaiter的实现。

private Node addWaiter(Node mode) {
        // 新建一个Node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // 如果队列的尾标兵tail不为空,将新加入的node插入到队尾,并更新tail
        if (pred != null) {
            node.prev = pred;
            // 如果CAS设置tail成功,直接返回
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果tail为空,或者CAS设置tail失败
        enq(node);
        return node;
    }
复制代码

这里的思路就是将新建的node插入到队尾,但是由于到考虑到线程安全的问题,采用了CAS更新,如果更新失败,调用enq方法,继续跟进看一下实现。

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 如果检查到队列没有初始化,先执行初始化,注意head对头是标兵
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // 在循环中执行CAS插入尾部操作
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
复制代码

所以看下来,addWaiter逻辑也很清晰,就是要将当前线程,封装为node插入到队列尾部。再看下acquireQueued的实现

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 返回node前驱
                final Node p = node.predecessor();
                // 如果前驱是head,说明当前队列中该线程排在第一顺位,再次调用tryAcquire
                // 因为后面调用的parkAndCheckInterrupt会让线程等待,当锁被release时,线程会被unpark
                // 所以重新tryAcquire来获取锁,如果获取成功,会将当前node设为头结点,相当于将当前
                // node从队列中删除了,因为头结点只是一个标兵,
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    // 这里之所以可以直接将.next置为null,而没有考虑node的next,因为是刚加入的node
                    // 它在队尾,而又是head的next,说明队列中就它一个,直接将head.next = null就可以了
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 先对head设置waitStatus标示位,然后park线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
复制代码

shouldParkAfterFailedAcquire这个方法很有意思,它是将head的waitStatus设为SINGLE,用来标识有任务需要被唤醒,在后面unpark的时候会检查该标识位。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取node的pre的waitStatus,
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 如果已经是Node.SIGNAL,可以安全的park,直接返回
            return true;
        if (ws > 0) {
            // 说明pred被取消了,并发时会出现这种情况,因为我们没有加锁
            // 继续向前查找waitStatus <= 0的node,
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 将pred的waitStatus设为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
复制代码

再看下parkAndCheckInterrupt这个方法的实现

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码

比较简单,直接调用了LockSupport.park,所以AQS中让线程等待的方式就是park,这也就是为什么我们前一篇文章要分析LockSupport源码的原因。

那么线程park等待了,那当然就要有唤醒,我们看下AQS中release的实现。

独占模式release方法源码解读

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

同样的AQS中release是一个final方法,不能被重写,我们可以重写tryRelease方法。当head不为空,切waitStatus不为0时,调用unparkSuccessor方法,跟进去看下实现

private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        if (ws < 0)
           // 先将waitStatus设为0
            compareAndSetWaitStatus(node, ws, 0);

        // 一般需要被唤醒的是node.next,但是如果next的node被取消了,或者waitStatus>0,这时候这里的
        // 策略是从尾部开始重新选择一个node来unpark
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // unpark唤醒线程
            LockSupport.unpark(s.thread);
    }
复制代码

release的实现相对简单一些,前面介绍tryAcquire失败后,会将当前线程插入到等待队列中时,然后将head的waitStatus置为SINGAL,那么在release时,会先检查这个标识,然后unpark,这里有个小细节,如果head.next被取消了或者waitStatus>0,会从队列的尾部开始往前查找到第一个符合条件的node来unpark。

这里有个细节大家要注意,release只是将队列中第一个满足条件等待的线程唤醒,所以接下来的逻辑还是在acquireQueued方法中,继续尝试调用tryAcquire,如果成功,则会被出队列(当前节点设为头结点),线程继续执行,否则继续等待。

介绍完了独占模式,再来看下共享模式。与独占模式类似,AQS也对共享模式提供了模板方法。分别是acquireShared和releaseShared,它们也都是final的,我们能够重写的方法是tryAcquireShared和tryReleaseShared。

共享模式acquireShared解读

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
复制代码

acquireShared先调用了tryAcquireShared方法,如果返回值小于0, doAcquireShared同样构建SHARED类型的Node加入等待队列。这里要提一下,tryAcquireShared方法使我们需要重写的,注意它的返回值是int类型的,而上面我们分析独占模式tryAcquire的返回值是boolean,因为在共享模式下这个返回值需要有三种状态,所以需要是int类型。

  • tryAcquireShared < 0,获取共享锁失败
  • tryAcquireShared = 0,获取成功,但是不需要唤醒队列中后续的节点
  • tryAcquireShared > 0,获取成功,需要唤醒队列中后续的节点

好,我们继续看下doAcquireShared的实现

// 添加当前节点到队列,与独占模式类似,不再赘述
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取node的前驱
                final Node p = node.predecessor();
                if (p == head) {
                    // 再次尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    // 如果获取成功
                    if (r >= 0) {
                        // 检查队列中后续的节点,是否需要被唤醒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // park让线程等待,与独占模式类似,不再赘述
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
        
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        // propagate > 0, 或者 当前头结点或者当前节点node的waitStatus < 0时,调用doReleaseShared
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
复制代码

大部分的逻辑跟独占模式类似,但是多了一个检查后续节点是否需要被唤醒的逻辑。

共享模式releaseShared解读

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
复制代码

可以看到doReleaseShared是在一个循环中,如果在调用中,head发生了变化,继续循环,否则挑出循环,而在都独占模式下,没有这样的并发问题,所以独占模式下不需要循环,另外干活的就是unparkSuccessor方法,它来唤醒等待的线程,上面在分析独占模式时已经分析过了,这里不再赘述。

总结

文章先是概述了一下AQS的基本实现原理,然后利用AQS实现了一个简单的互斥锁,最后详细分析了AQS中独占和共享二种模式的关键方法的实现。以上。

原文  https://juejin.im/post/5d73b2d4518825570327e2cf
正文到此结束
Loading...