转载

CountDownLatch源码解析

CountDownLatch是Java并发包下的一个工具类,latch是门闩的意思,顾名思义,CountDownLatch就是有一个门闩挡住了里面的人(线程)出来,当count减到0的时候,门闩就打开了,人(线程)就可以出来了。下面从源码的角度看看CountDownLatch究竟是如何实现的。

内部类

CountDownLatch类中有一个静态内部类 Sync ,它继承自 AbstractQueuedSynchronizer ,所以可以看出,CountDownLatch的功能还是通过AQS来实现的。

构造函数

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
复制代码

可以看出要使用CountDownLatch就需要指定count值,且必须大于0,而这个count值最终是赋值给了AQS的state,可以看下面 new Sync(count)的源码,它实际上是set的state值,而这个state是AQS中的属性

Sync(int count) {
	setState(count);
}
复制代码

方法

countDown方法

它是调用内部类Sync的releaseShared方法,这个方法会先去通过cas的方式修改state值,如果state修改之前就是0或者修改之后不等于0,那就什么都不需要操作了;如果修改之后state=0,那么就去执行doReleaseShared方法。

public void countDown() {
	sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // state=0或者修改之后state<>0,返回false
    for (;;) {
        int c = getState();
        if (c == 0)
          	return false;
        int nextc = c-1;
        //修改之后state值=0,返回true
        if (compareAndSetState(c, nextc))
        		return nextc == 0;
    }
}
复制代码
//这个方法countDown会调用,await方法在被唤醒后也会调用doReleaseShared
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果状态是signal(-1),cas的方式把它改为0
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //cas成功修改的话,则去唤醒h的下一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
            				!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
            		continue;                // loop on failed CAS
        }
        //如果h和head相同了,则跳出循环
        if (h == head)                   // loop if head changed
        		break;
    }
}
复制代码

await方法

await方法是调用该方法的线程处于等待状态(state>0),下面从源码分心一下await方法是如何实现的。

await方法最终调用到了AQS的acquireSharedInterruptibly方法

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
      	throw new InterruptedException();
    //如果state=0,则tryAcquireShared方法返回1,则不用等待(也就验证了countdown减到0,才释放线程)
    if (tryAcquireShared(arg) < 0)
      	doAcquireSharedInterruptibly(arg);
}
复制代码
private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
    //把新node加到node的尾部
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
      for (;;) {
        final Node p = node.predecessor();
        //如果node的prev是head的话
        if (p == head) {
          //看state是否=0,如果等于0,则r=1,否则r=-1
          int r = tryAcquireShared(arg);
          if (r >= 0) {
            //state=0的时候,把node置为head,然后去唤醒head的下一个节点
            //setHeadAndPropagate方法参照下面的解析
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
          }
        }
        //如果p的status是signal(-1)的话,则执行parkAndCheckInterrupt方法,将该线程挂起
        if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())
          throw new InterruptedException();
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
}
复制代码
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
          (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            //调用doReleaseShared,去唤醒新head的下一个节点
          	doReleaseShared();
    }
}
复制代码

例子

下面举个例子说明CountDownLatch的用法,而且如果大家是想通过debug的方式跟踪CountDownLatch是如何实现的,那么在断点处的suspend一定要改为Thread,因为在await的时候,线程挂起,而在countDown的时候,首先把head的next节点(暂时称作A节点)唤醒,而此时A节点在await挂起的线程就被唤醒了,继续往下执行,由于await方法会调用setHeadAndPropagate方法,setHeadAndPropagate方法会调用doReleaseShared(countDown也是调用这个方法唤醒线程),所以除了调用countDown的线程,被唤醒的线程也会去唤醒它的下一个节点,所以doReleaseShared方法是被多线程调用的,因此在debug的时候一定要把suspend改为Thread才能看到效果。

public void test() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(1);

    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            try {
                System.out.println("子线程" + Thread.currentThread().getName() + "await 前");
                countDownLatch.await();
                System.out.println("子线程" + Thread.currentThread().getName() + "await 后");
            } catch (InterruptedException e) {
              	e.printStackTrace();
            }
        }).start();
    }

    Thread.sleep(30000);
    countDownLatch.countDown();
    System.out.println("完成");
}
复制代码
原文  https://juejin.im/post/5e8af0286fb9a03c7204cb10
正文到此结束
Loading...