转载

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

原创作品未经作者同意严禁私自转载,或私自修改后据为自有。违者将追究法律责任

Ⅰ. AQS 简介

AbstractQueuedSynchronizer 是JDK并发包的核心类,大多数并发工具实现如 CountDownLatchSemaphoreReentrantLock 等等在内部都持有 AQS 的子类,也就是说它们的功能都是在 AQS 的基础上实现的。

这个类由并发专家 Doug Lea 编写,关于这个同步器他还写了一篇专门的文章讲述它的设计

《 The java.util.concurrent Synchronizer Framework 》

Ⅱ. 基本设计

AQS 是以 FIFO 队列为基础的多线程同步器。当线程未能成功获得 许可 时( 许可 数量用尽)就会加入内部的 FIFO 队列中等待,直到有其他线程释放了 许可

这个队列实际上是一个 CLH锁 ,并且做了些必要的改进以达到以下目标

  1. 处于 park 状态的线程可以被队列前驱唤醒
  2. 处理队列节点中线程取消等待的情况
  3. CLH 一般用于 自旋 ,而 AQS 的设计原则是尽可能减少线程 自旋 节约资源

还有很重要的一点, AQS 提供了两种同步模式—— 独占共享独占 就是一次只能有一个线程获得 许可 ,而 共享 则是可以有多个线程获得 许可 。例如 WriteLock 是独占的, ReadLock 是共享的

本文只关注于 AQS 的核心实现,如果读者在阅读过程中发现不理解的概念或名词,还请自行搜索相关资料。

Ⅲ. FIFO队列

首先请大家一定明确一个概念:线程只会在未能成功获得 许可 的情况下才会入队列,就比如对一个 Lock 对象作 Lock.lock() 操作,

如果这个锁已经被别的线程持有了,即 许可 用完了,那么当前线程才会进入队列等待。

AQS 里,队列的节点用内部类 Node 表示,它的结构如下

static final class Node {
        volatile int waitStatus;    // 当前节点的状态
        volatile Node prev;         // 前驱节点
        volatile Node next;         // 后继节点
        volatile Thread thread;     // 当前等待线程
        Node nextWaiter;            // 用于Condition.await()
        
        final Node predecessor() {...}
        final boolean isShared() {...}
    }

下面这张图展示了多个线程等待获得 许可 的情况

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

我们来看看 Node 类的核心 waitStatus 字段,这个字段表达的含义很丰富,所以很难用寥寥几句概括它,不同的值反映了 AQS 不同的行为模式

waitStatus取值 表示 含义 特别说明
1 Cancelled 当前线程已经取消等待 N/A
0 初始状态 当线程新创建一个 Node 对象时,这个字段处于起始状态 N/A
-1 Signal 当前线程在取消等待或者获得许可时,必须对第一个非Cancel状态的后继节点执行 Unpark 操作 用于 AQS 的独占模式
-2 Condition 表示当前线程在Condition对象上等待 N/A
-3 Propagate 当前线程在取消等待或者获得许可时,必须将 许可还有机会获得 的信息通知后继的节点,这个信息会尽可能地向后"冒泡" 用于 AQS 的共享模式

我们来聊一下什么是 parkunpark ,大家都知道 Thread.sleep() 的作用是让当前线程睡眠,而 LockSupport.park() 则更进一步,按照它的注释说明,这个方法的作用是将当前线程移出 线程调度器

也就是说执行了 park 方法,当前线程会立刻停止,不再执行后续代码,即使你调用 interrupt() 也无法将其"唤醒",因为它已经不在 线程调度器 中了。

如果需要让这个线程回到调度器中,需要使用 LockSupport.unpark(thread) 方法。

现在让我们忘记这些看不懂的东西吧,跟着我从原始的锁实现开始,一步步完善它,最后相信你会明白这一节说的是什么。

Ⅳ. 原始的锁实现——CLH锁

我们先从最简单的锁实现开始,话不多说,一图胜千言

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

节点数据结构如下

/* 节点结构 */
    class Node {
        Node prev;
        boolean isAvailable;
    }

每个节点的线程在等待锁和释放锁时分别执行的代码如下

/* 入队操作 */
    public void enque(Node node) {
        for (;;) {
            Node t = tail;
            node.prev = t;
            if (CAS(tail, t, node))
                return;
        }
    }
    
    /* 获取锁操作 */
    public Node lock() {
        //建立节点插入队列尾部
        Node node = new Node();
        enque(node);
        
        // 当prev.isAvailable是false时,表示前驱线程正在占用锁
        while (!node.prev.isAvailable) {
        }
        return node;
    }
    
    /* 释放锁操作 */
    public void unlock(Node node) {
        node.isAvailable = true;
    }

结构上, CLH 节点与前面的 Node 节点最大区别在于 CLH 没有 next 引用,只有一个 prev 前驱引用。

实现上, CLH 中每个节点的线程都在 自旋

这就造成了四个问题

  1. CLH 的线程都在自旋,浪费了很多CPU资源,并且浪费的程度与队列中线程数量成正比
  2. CLH 无法清除无用的节点,不具备 伸缩性 ,在高并发情况下很快就会因为 FIFO 队列太长导致OOM
  3. 如果 CLH 中某个节点的线程不想继续等待,希望退出队列,以 CLH 的数据结构无法满足
  4. CLH 的设计只考虑了独占模式,即一次只能有一个线程获得 许可 ,无法用于共享模式

其实这些问题的本质都在于 CLH 没有提供操作后继节点的途径

如果读者不能理解为什么会有上面这四个问题,还请配合图示仔细考察 CLH 的实现

Ⅴ. 从减少自旋开始

首先要解决的当然是减少浪费CPU的问题了。要做到这个就意味着,线程需要使用 park 操作,而这进一步意味着必须有其他线程对其作 unpark 操作。

根据 CLH 的实现,这个 其他线程 就是前驱线程。所以我们改进节点的数据结构,为每个节点都增加一个 next 引用,为了能对后继线程执行 unpark ,还需要一个 thread 引用

/* 节点结构 */
    class Node {
        Node prev;
        Node next;
        Thread thread;
        boolean isAvailable;
    }

对应的操作当然也需要改进

/* 入队操作 */
    public void enque(Node node) {
        for (;;) {
            Node t = tail;
            node.prev = t;
            if (CAS(tail, t, node)) {
                t.next = node;
                return;
            }
        }
    }
    
    /* 获取锁操作 */
    public Node lock() {
        Node node = new Node(Thread.currentThread());
        enque(node);
        
        while (!node.prev.isAvailable)
            LockSupport.park(this);
        return node;
    }
    
    /* 释放锁操作 */
    public void unlock(Node node) {
        node.isAvailable = true;
        if (node.next != null)
            LockSupport.unpark(node.next.thread);
    }

用图说话就是这样

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

Ⅵ. GC的呐喊——清除无用的节点

显而易见,按照上面这张图的做法,如果处于高并发的环境中,这个队列分分钟就要爆炸了。我已经能听到垃圾收集器在骂我傻X了,额,我承认我是的,因为这样的设计根本不能用于真实环境。

所以接下来我们要将已经释放锁的节点移出队列,然后把这些节点狠狠塞到垃圾收集器那张让人憎恶,哦不,可爱的大嘴里,最好再给他一大耳刮子

大家第一个想到的应该就是双向链表的删除操作,不过让人欣慰的是,我们只需要在队列中的第一个节点上施加删除操作就可以了,因为不需要考虑并发,代码简单得令人发指

/* 清除无用的节点 */
    public void setHead(Node node) {
        head = node;
        node.prev.next = null;
        node.prev = null;
        node.thread = null;
    }

当然释放锁的步骤得更新一下

/* 释放锁操作 */
    public void unlock(Node node) {
        node.isAvailable = true;
        setHead(node);
        if (node.next != null)
            LockSupport.unpark(node.next.thread);
    }

配合图解,味道更佳

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

Ⅶ. 烦躁!老子不想再等了!

最近有许多线程跑到我的办公室对我破口大骂,就差掀我桌子了。他们一致认为我设计的 CLH 锁太特么烂了,因为他们一旦进队列了,就必须等到前驱释放锁才能继续他们的工作。

“你个损塞,懂不懂用户体验?!” “日他的前驱,拿了锁啥事不干,在那死循环呢?!都一个小时了,海贼王都完结了还没好!哪个傻X设计的锁!”

为了避免长时间等待锁,线程们召开第88届Thread代表大会,会议上代表们一致认为我需要提供一个可以中途退出等待的机制。

说实话,这个可不是个简单的任务,但是面对代表们人手一把的40米大刀,我好像没有第二条路了。

超时等待很好实现, LockSupport.parkNanos() 提供了相应的功能,然而超时之后怎么把自己从队列中移除是个难题,因为这个时候你得考虑其他节点也会并发退出的情况。

我们先来看这段代码

/* 退出等待 */
    public void cancelAcquire(Node node) {
        node.prev.next = node.next;
        node.next.prev = node.prev;
        node.prev = null;
        node.next = null;
        node.thread = null;
    }

这是个标准的链表删除操作,但是这段代码没法用在并发退出的情形下。因为这四个步骤不是原子的,在步骤间隙没法确定其他节点会做什么操作。比如下面这样,假设A和B两个节点都要退出等待,那么就会...

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

JDK8源码分析——并发库核心AbstractQueuedSynchronizer的实现思路

最后我们发现队列已经被破坏了,上图只是众多情况中的一种,想要凭脑力找出所有的潜在并发场景几乎是不可能的,至少对我来说是这样,毕竟我的脑袋不太灵光,只能以单核模式缓慢运行。

所以,下面的问题就是,怎样保证在并发退出的情况下,还能正确保持队列的完整性。这个时候我的心里已经开始在嘀咕了:真的存在这样的方法么?不行啊怎么想都没法做到唉!40米大刀我能跑得过么?

仔细思考下,想要依靠链表删除操作完成并发退出是不可行的,因为这里没有可以利用CAS进行原子隔离的操作空间。

为了能让并发情况可控,我们需要减少退出操作所涉及的引用,所以我们先尝试在退出时只操作一个方向的引用,因为队列末尾的节点next是空,所以我们为了避免null判断,退出时操作前驱的next引用

/* 退出等待 */
    public void cancelAcquire(Node node) {
        Node pred = node.prev;
        Node prevNext = pred.next;
        Node next = node.next;
        CASNext(prevNode, prevNext, next);
    }

似乎已经没法继续降低退出操作所涉及的引用了,毕竟本质上就是把节点从链表中移除,怎么也得操作到前驱的next引用和当前节点的next引用。

即便是这样,还是会出现并发下有节点不能正确退出,不过这是显而易见的结果 —— 每个节点设置前驱next引用的过程都是相互独立的,根本无法保证先后顺序。

所以现在我们可以做一个推理,如果真的存在一个方法能够实现并发退出,那么这个方法肯定不能仅靠更改引用,还必须有其他措施来使得节点能识别出已退出的节点。

/* 退出等待 */
    public void cancelAcquire(Node ndoe) {
        Node pred = node.prev;
        while (pred.status == cancelled)
            node.prev = pred = pred.prev; /* 假设Head的status不为Cancelled */
        Node predNext = pred.next;
        Node next = node.next;
        
        node.status = Cancelled;
        
        if (pred.status != Cancelled) {
            if (next == null || next.status != Cancelled)
                CASNext(pred, predNext, next); /* 假设这步失败了,说明有后续节点抢先完成了退出,这时候本节点已经被跳过了 */
        }
        node.next = null;
        node.thread = null;
    }

这个方法很有意思,它在 node.status = Cancelled 之后,作了一个双向检查,这样就可以保证,相邻两个节点并发退出时,到 node.status = Cancelled 代码之后,其中一个必然能检测到另一个处于Cancelled状态。再仔细分析,我们惊喜地发现,这个方法保证了连续的一串节点并发退出时,必定能有

1. 全部节点`status = Cancelled`
2. 在下次后续节点退出时,会直接跳过`status = Cancelled`节点

不过这里就有个问题,非Cancelled节点的next引用可能指向了一个处于Cancelled的无用节点, unpark 操作就无法对正确的等待线程施行。

所以需要一个方法从Tail向前寻找离当前节点最近的非Cancelled节点

/* unpark非退出后继线程 */
    public void unparkSuccessor(Node node) {
        Node next = node.next;
        if (next == null || next.status == Cancelled) {
            next = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.status != Cancelled)
                    next = t;
        }
        if (next != null)
            LockSupport.unpark(next);
    }

Ⅷ. 累了,不更了

以上就是 AQS 独占模式的基本实现思路,当然我应该对第四个问题,怎么实现共享模式进行分析,不过基于以下几个原因

1. 高Sir我累了
2. Gao sir is tired
3. 共享模式的基本思路跟独占是一样的

加上这篇文章在草稿箱也快两个礼拜了,我也不想再画图了,就这样发表吧,大家好好阅读源代码,很多细节我都省略了,有问题评论下方交流。

原文  https://segmentfault.com/a/1190000014866999
正文到此结束
Loading...