转载

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

本系列是基于经验设计原型,然后不断优化最终达到和AQS(AbstractQueuedSynchronizer)类似的设计。最终结果不一定和AQS完全一致,基于个人理解会有修改,可以作为理解AQS的不完全参考。

接上篇。本篇主要介绍Condition即条件变量的实现,ReentrantLock中最后一块需要实现的内容。

在实现条件变量之前,考虑一下条件变量的一些特性。

  1. 条件变量依赖锁,而且是独占锁
  2. 执行await方法后释放锁,当前线程进入睡眠状态,等待满足条件后被其他线程signal/signalAll唤醒,被唤醒后会尝试重新获取锁
  3. 唤醒可以选择一个也可以全部

注意第一条特性,条件变量依赖的是独占锁,所以类似读锁这种共享锁是不支持条件变量的,ReentrantReadWriteLock中ReadLock#newCondition的实现是直接抛错。

按照第二条特性,可以得到如下的执行过程

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

注意ThreadA中被加粗的unlock和lock,调用条件变量的await方法时,虽然没有显式调用unlock和lock,但是内部其实是做了类似的事情的。

从第三条来看,条件变量应该是对应了一个专门的队列的,那些调用了await方法的线程会进入条件变量的队列中等待被signal。这个专门的队列和锁自身的lock/unlock等待队列不同,语义上唤醒是某个线程的signal/signalAll,而不是lock/unlock方法。如果有点难理解的话,可以考虑在上图中增加一个不调用条件变量任何方法的ThreadC,C的lock/unlock不会唤醒A,因为A等待的是B的唤醒,所以条件变量的队列和锁的lock/unlock队列是分离的,类似下图。

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

那么条件变量的等待队列需要设计得和锁的等待队列一样么?答案是不用。原因是假如你在await的unlock语义执行之前加入队列的话,因为条件队列依赖的是独占锁,只有一个线程可以操作condition queue。调用signal方法时,锁还没有被释放,同样只有一个线程可以操作condition queue。所以condition queue可以使用普通的同步队列设计。

基于以上初步分析之后得到的条件变量原型

import javax.annotation.Nonnull;
import java.util.Date;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
 
public class LockWithCondition1 implements Lock {
 
    @Override
    public void lock() {
 
    }
 
    @Override
    public void lockInterruptibly() throws InterruptedException {
 
    }
 
    @Override
    public boolean tryLock() {
        return false;
    }
 
    @Override
    public boolean tryLock(long time, @Nonnull TimeUnit unit) throws InterruptedException {
        return false;
    }
 
    @Override
    public void unlock() {
 
    }
 
    @Override
    @Nonnull
    public Condition newCondition() {
        return new ConditionImpl();
    }
 
    @SuppressWarnings("Duplicates")
    private class ConditionImpl implements Condition {
        final LinkedList<WaitingThread> waitingThreads = new LinkedList<>();
 
        @Override
        public void await() throws InterruptedException {
            waitingThreads.addLast(new WaitingThread(Thread.currentThread()));
            unlock();
            LockSupport.park(this);
            lockInterruptibly();
        }
 
        @Override
        public void awaitUninterruptibly() {
            waitingThreads.addLast(new WaitingThread(Thread.currentThread()));
            unlock();
            LockSupport.park(this);
            lock();
        }
 
        @Override
        public long awaitNanos(long nanosTimeout) throws InterruptedException {
            if (nanosTimeout <= 0L) {
                return 0L;
            }
            long startAt = System.nanoTime();
            waitingThreads.addLast(new WaitingThread(Thread.currentThread()));
            unlock();
            LockSupport.parkNanos(this, nanosTimeout);
            lock();
            return System.nanoTime() - startAt;
        }
 
        @Override
        public boolean await(long time, @Nonnull TimeUnit unit) throws InterruptedException {
            WaitingThread t = new WaitingThread(Thread.currentThread());
            waitingThreads.addLast(t);
            unlock();
            LockSupport.parkNanos(this, unit.toNanos(time));
            boolean signaled = (t.status.get() == WaitingThread.STATUS_SIGNALED || !t.abort());
            lock();
            return signaled;
        }
 
        @Override
        public boolean awaitUntil(@Nonnull Date deadline) throws InterruptedException {
            return await(deadline.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
 
        @Override
        public void signal() {
            WaitingThread t = waitingThreads.removeFirst();
            if (t != null) {
                t.wakeUp();
            }
        }
 
        @Override
        public void signalAll() {
            for (WaitingThread t : waitingThreads) {
                t.wakeUp();
            }
            waitingThreads.clear();
        }
    }
 
    private static class WaitingThread {
        static final int STATUS_NORMAL = 0;
        static final int STATUS_SIGNALED = 1;
        static final int STATUS_ABORTED = -1;
 
        final AtomicReference<Thread> thread;
        final AtomicInteger status = new AtomicInteger(STATUS_NORMAL);
 
        WaitingThread(@Nonnull Thread thread) {
            this.thread = new AtomicReference<>(thread);
        }
 
        void wakeUp() {
            if (status.compareAndSet(STATUS_NORMAL, STATUS_SIGNALED)) {
                LockSupport.unpark(thread.get());
            }
        }
 
        boolean abort() {
            thread.set(null);
            return status.compareAndSet(STATUS_NORMAL, STATUS_ABORTED);
        }
    }
}

这里为了只关注条件变量省去了lock的所有代码。可以看到await方法中,执行了如下操作

  1. enqueue(condition queue)
  2. unlock
  3. park
  4. lock

对应地,signal执行了如下操作

  1. dequeue(condition queue)
  2. unpark

提问,这里是否会发生signal无法唤醒await线程的情况?

考虑到enqueue和dequeue都是在持有独占锁的前提下操作的,实际可能有两种情况:

  1. 先dequeue
  2. 先enqueue

先dequeue意味着signal先执行(因为此时持有独占锁),以及队列为空,unpark不会唤醒任何线程。signal先执行等价于之后await调用不会发生,因为进入await之前的条件是满足的。执行结果正确。

先enqueue意味signal还无法执行(因为此时await的线程持有锁)。unlock之后,signal线程获取锁,发现await线程的节点,尝试unpark。这里有可能是unpark/park,也有可能是park/unpark执行序列。不管哪种,await线程都会被唤醒,尝试lock,进入lock的等待队列。signal线程释放锁之后,await线程获取锁,执行await之后的代码。执行结果正确。

可以看到,条件变量的分析比独占锁本身要容易很多,一方面这是条件变量本身的特性所致,另一方面合理利用条件变量的特性(特性一)简化了设计。

顺便说一句,设计上多个条件变量的队列互不影响,如果你看到某个锁的条件变量要求只能有一个,或者互相有影响的话,多少说明设计是有问题的。

刚才关于“先enqueue”的分析中,你是否注意到await线程被唤醒之后,由于signal线程还没有释放锁,所以await线程暂时无法获取锁,导致了一次多余的unpark/park。是否可以直接把await线程的节点从conditon queue转移到lock的等待队列?当然这样做的前提是条件变量了解lock的内部实现,由于条件变量依赖锁,所以这不是问题。

其次LockWithCondition1严格上来说在一些边界条件上不满足条件变量的语义。比如说await方法在抛出InterruptedException时必须持有锁(await返回时必须持有锁),现在的实现中lockInterruptibly在抛出InterruptedException时不持有锁。这是条件变量实现中最容易忽视的问题。

还有一些条件变量实现的细节问题,比如await的什么时候可以抛出InterruptedException。这块可以参考Condition接口本身的注释。

Java并发研究 自己写ReentrantLock和ReentrantReadWriteLock(3)

以下是修改版的条件变量实现。由于代码比较多,这里只展示了ConditionImpl的实现,其他代码可以参考

https://github.com/xnnyygn/concurrent-learning/tree/master/src/main/java/in/xnnyygn/concurrent/mylock

import javax.annotation.Nonnull;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
 
@SuppressWarnings("Duplicates")
class ConditionImpl implements Condition {
    private final MyLock lock;
    private final ConditionQueue conditionQueue = new ConditionQueue();
 
    ConditionImpl(MyLock lock) {
        this.lock = lock;
    }
 
    @Override
    public void await() throws InterruptedException {
        Node node = Node.createConditionForCurrent();
        conditionQueue.enqueue(node);
        lock.unlock();
        boolean interrupted = false;
        while (nodeNotLockEnqueued(node)) {
            LockSupport.park(this);
            /*
             * 1. interrupted, unknown
             * 2. predecessor is aborted, lock enqueued
             * 3. signaled by predecessor, lock enqueued
             */
            if (Thread.interrupted()) {
                interrupted = lockEnqueueByAwaitingThread(node);
                break;
            }
        }
        if (lock.acquireUninterruptibly(node) && !interrupted) {
            Thread.currentThread().interrupt();
        }
        if (interrupted) {
            conditionQueue.removeNonConditionNodes();
            throw new InterruptedException();
        }
    }
 
    @Override
    public void awaitUninterruptibly() {
        Node node = Node.createConditionForCurrent();
        conditionQueue.enqueue(node);
        lock.unlock();
        boolean interrupted = false;
        while (nodeNotLockEnqueued(node)) {
            LockSupport.park(this);
            if (Thread.interrupted()) {
                interrupted = true;
            }
        }
        if (lock.acquireUninterruptibly(node) || interrupted) {
            Thread.currentThread().interrupt();
        }
    }
 
    @Override
    public long awaitNanos(long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L) {
            return 0L;
        }
        Node node = Node.createConditionForCurrent();
        conditionQueue.enqueue(node);
        lock.unlock();
        long deadline = System.nanoTime() + nanosTimeout;
        long nanos = nanosTimeout;
        boolean interrupted = false;
        while (nodeNotLockEnqueued(node)) {
            LockSupport.parkNanos(this, nanos);
            nanos = deadline - System.nanoTime();
            if (Thread.interrupted()) {
                interrupted = lockEnqueueByAwaitingThread(node);
                break;
            }
            if (nanos <= 0L) {
                lockEnqueueByAwaitingThread(node);
                break;
            }
        }
        if (lock.acquireUninterruptibly(node) && !interrupted) {
            Thread.currentThread().interrupt();
        }
        if (interrupted || nanos <= 0L) {
            conditionQueue.removeNonConditionNodes();
        }
        if (interrupted) {
            throw new InterruptedException();
        }
        return nanos;
    }
 
    @Override
    public boolean await(long time, TimeUnit unit) throws InterruptedException {
        return awaitNanos(unit.toNanos(time)) > 0;
    }
 
    @Override
    public boolean awaitUntil(@Nonnull Date deadline) throws InterruptedException {
        return awaitNanos(TimeUnit.MILLISECONDS.toNanos(deadline.getTime() - System.currentTimeMillis())) > 0;
    }
 
    @Override
    public void signal() {
        Node node;
        do {
            node = conditionQueue.dequeue();
            if (node == null) {
                return;
            }
        } while (!lockEnqueueBySignalingThread(node));
    }
 
    @Override
    public void signalAll() {
        Node node;
        while ((node = conditionQueue.dequeue()) != null) {
            lockEnqueueBySignalingThread(node);
        }
    }
 
    private boolean nodeNotLockEnqueued(@Nonnull Node node) {
        if (node.status.get() == Node.STATUS_CONDITION || node.predecessor.get() == null) {
            return true;
        }
        if (node.successor.get() != null) {
            return false;
        }
        // status == NORMAL
        return !lock.queue.contains(node);
    }
 
    private boolean lockEnqueueByAwaitingThread(@Nonnull Node node) {
        if (node.status.compareAndSet(Node.STATUS_CONDITION, Node.STATUS_NORMAL)) {
            lock.queue.enqueue(node);
            return true;
        }
        // enqueuing by signal thread
        while (!lock.queue.contains(node)) {
            Thread.yield();
        }
        return false;
    }
 
    private boolean lockEnqueueBySignalingThread(@Nonnull Node node) {
        if (!node.status.compareAndSet(Node.STATUS_CONDITION, Node.STATUS_NORMAL)) {
            return false;
        }
        Node predecessor = lock.queue.enqueue(node);
        if (predecessor.isAborted() || !predecessor.status.compareAndSet(Node.STATUS_NORMAL, Node.STATUS_SIGNAL)) {
            // predecessor is aborted
            LockSupport.unpark(node.thread.get());
        }
        return true;
    }
}

在分析修改版的await之前,解释一下这里的Node。MyLock使用的Node和AQS一样,可同时用于condition queue和lock queue。这样做只是为了复用Node,你也可以选择和LockWithCondition1一样,分开使用condition queu的节点和lock queue的节点。condition queue中主要使用Node的thread,status和next,lock queue使用除next以外所有字段。为了区分condition queue和lock queue的节点,condition queue的节点一开始的状态是STATUS_CONDITION,相对的,lock queue中节点一开始为STATUS_NORMAL。前篇提到过Node中status的迁移图,不包括STATUS_CONDITION。事实上加入lock queue时节点的status不可能为STATUS_CONDITION。STATUS_CONDITION只是condition queue中的状态,这点请注意。

condition queue中节点的status只有一种迁移

STATUS_CONDITION -> STATUS_NORMAL

可能的情况有

  1. signal
  2. signalAll
  3. await超时
  4. 中断

为什么不区分异常情况?原因是异常状态是也需要加入lock queue,此时status必须是STATUS_NORMAL。你不能把一个STATUS_ABORTED的节点加入lock queue,这样你就没办法获取锁了。重申一遍,条件变量的语义中不管是正常返回,还是await超时,中断,返回时必须持有锁。

由于以上原因,status迁移只有一种。考虑到signal/signalAll时signal线程执行状态迁移,await超时和中断时await线程执行状态迁移,status迁移还必须是CAS。

condition queue的具体实现在类ConditionQueue中,这个类现阶段有3个方法

  1. enqueue
  2. removeNonConditionNodes
  3. dequeue

enqueue很好理解,removeNonConditionNodes主要用于在await超时和中断移除await线程的节点的。注意,只是从condition queue中移除,之后会放到lock queue中去。dequeue就是正常情况从condition queue中移除下一个节点,之后放到lock queue中去。

在上述前提下理解await和signal方法。

首先看最简单的signal方法。从condition queue中取出一个节点,如果没有节点就直接返回。有的话尝试状态迁移,失败的话,说明await线程自己修改了状态(状态迁移的情况3和4),signal会尝试下一个节点。如果状态迁移成功,此时按照前篇的要求,设置前置节点status为STATUS_SIGNAL。如果前置节点放弃了,或者CAS过程中失败了(意味着前置节点放弃了),此时signal线程能做的事情就是唤醒await线程,让await线程自己跳过放弃的前置节点。

signal线程是否可以帮await线程的节点跳过放弃的节点而不是unpark await线程?这样可以帮忙省去一次unpark/park。

回答是请分析哪些是signal线程可以做的,哪些是await线程可以做的。

从MyLock是基于CLHLock实现这点来说,设置前置节点的status是后继节点对应的线程,即await线程该做的事情。所以signal线程这里设置前置节点的status其实不是它的职责,这里个人觉得是为了减少一般情况下的unpark/park。如果碰到前置节点是放弃了的节点还考虑帮忙跳过的话,有点舍本逐末了。

signalAll和signal类似,这里不再赘述。

接下来是相对简单的awaitUninterruptibly。为什么不是await?因为await对于中断的处理有点微妙。awaitUninterruptibly不处理中断,准确来说是,碰到中断,只会重新设置中断状态。这样关注点就可以集中到中断处理以外的部分。

awaitUninterruptibly的基本流程和LockWithCondition1有所不同。考虑到park会响应中断,需要把park放在一个循环中,记录中断状态。跳过循环的唯一条件是节点已经被加入lock queue。

判断是否已加入lock queue的最直接的方法(nodeNotLockEnqueued最后)是从tail检查节点是否存在。其次是检查进入lock queue之后才会被使用的字段,比如successor。假如successor被设置,那么节点肯定被加入了lock queue。还有一种提前判断的方法,节点自身的状态。如果节点status是STATUS_CONDITION或者进入lock queue后才会被使用的predecessor字段没有被设置的话,可以断言节点还没有加入lock queue。

awaitUninterruptibly中跳出循环之后,执行acquireUninterruptibly。在AQS中这个方法被叫做acquireQueued。从行为上这个acquireXXX方法在节点被加入lock queue后执行。其次,会记录acquire过程中的中断,但是不会因为中断而停止尝试获取锁。await系列方法其实需要的就是这种获取模式,不会因为中断而放弃获取的方式。

不管是acquireUninterruptibly还是循环过程中被中断,awaitUninterruptibly最终只会重新设置中断标志,除此以外什么都不做。

在理解了acquireUninterruptibly之后,看一下await方法。

相比于await方法,循环有两个出口。一个是由signal线程放到lock queue的条件,另外一个是中断。中断时,await线程会自己尝试把节点加入lock queue。从两个lockEnqueueByXXXThread方法可以看出,存在两个线程同时把节点放入lock queue的可能性。假如signal线程先放进lock queue,那么await在CAS这一步(lockEnqueueByAwaitingThread第一行)会失败,这时必须等待signal线程enqueue操作的完成(通过yield)。假如await线程先放进lock queue,那么signal线程会失败,现有逻辑中signal线程会尝试唤醒下一个节点。

注意这里的await和AQS的await方法有些许不同。AWS的await在await线程先放入lock queue时最后会抛出InterruptedException,但是在signal线程先放入的话,会设置中断状态。这里的await,在await线程先放入时同样会抛出异常,但是在signal线程先放入的话,不会设置中断状态。这里其实有一个边界问题,在中断时恰好被signal时算不算中断?个人觉得可以不算,要算也是以留下证据为目的。

跳出循环之后,同样调用acquireUninterruptibly,按情况设置中断标志。最后在确认是中断了的情况下,从condition queue中移除自己的节点,并抛出异常。注意这里,移除节点时持有独占锁,其次,中断不代表节点仍旧在condition queue里面。一种情况是await线程比同时执行的signal先CAS成功。

考虑到这里的await和AWS的await相比还是有点差异的,如果你对于改造不太放心的话,在理解的前面内容的基础上,相信你能修改成AQS的语义。

最后是几个限时await方法,这里全部引导到同一个限时方法awaitNanos上。主体流程和await很像,有一点不同是剩余时间的处理。个人理解awaitNanos的返回值代表是是否超时,所以不能简单把timeout减去执行时间返回去调用方。具体代码这里不再分析。

小结

总体来说,条件变量比起独占锁本身要简单。因为可以借助条件变量的特性使用同步队列。另一方面,条件变量实现中也有比较细节的问题需要考虑。

至此,ReentrantLock的实现算是真正完成了。接下来是ReentrantReadWriteLock的实现。

原文  http://blog.gssxgss.me/java-concurrent-reentrant-lock-3/
正文到此结束
Loading...