文章篇幅较短,对于一些 AQS 的顶级方法例如 releaseShared 并没有做过深的讲解,因为这些算是 AQS 的范畴,关于 AQS 可以看下另一篇文章——AQS。
CountDownLatch 一般被称作"计数器",作用大致就是数量达到了某个点之后计数结束,才能继续往下走。可以用作 流程控制 之类的作用,大流程分成多个子流程,然后大流程在子流程全部结束之前不动(子流程最好是相互独立的,除非能很好的控制两个流程的关联关系),子流程全部结束后大流程开始操作。
很抽象,小问题,下方的两节或许能让你理解 CountDownLatch 的用法和内部的实现。
假设现在,我们要起一个 3块钱 的集资项目,并且限定每个人一次只能捐 1 块钱当募集到 3 块钱的时候立马就把这笔钱捐给我自己,如果凑齐之后你还想捐,那么我会跟你说,项目已经完成了,你这一块钱我不受理,自己去买雪糕吃吧;如果没凑齐,那么我这个募集箱就一直挂在这里。这个场景用 CountDownLatch 可以很契合的模拟出来。
字数也不凑了,直接看 demo 例子吧
public static void main(String[] args) throws InterruptedException {
// 集资项目==========>启动,目标3块钱
CountDownLatch countDownLatch = new CountDownLatch(3);
ThreadPoolExecutor executor = ThreadPoolProvider.getInstance();
executor.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.err.println("张1准备捐一块钱");
countDownLatch.countDown();
System.err.println("张1捐了一块钱");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.err.println("张2准备捐一块钱");
countDownLatch.countDown();
System.err.println("张2捐了一块钱");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executor.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
System.err.println("张3准备捐一块钱");
countDownLatch.countDown();
System.err.println("张3捐了一块钱");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.err.println("我项目启动后,就在这里等人捐钱,不够3块我不走了");
countDownLatch.await();
System.err.println("3块钱到手,直接跑路");
executor.shutdown();
}
结果图:
这个结果,em,可以看到 countDownLatch 使用的几个注意点:
countDownLatch 的 await() 方法的线程会阻塞,直到凑够3块钱为止 countDown() 方法都会捐一块钱(计数一次),满了之后调用 await() 方法的线程不再阻塞 另外,在上面的代码中,在 countDown 方法之后还打印信息是为了验证 countDown 方法不会阻塞当前线程,执行结果不一定如上图那样有顺序的,例如可能出现下方的结果:
因为最后一个 countDown 之后, await 所在的线程不再阻塞了,又正好赶上 JVM 线程调度,所以就会出现上方的结果。
刚才已经讲了 CountDownLatch 的用法,用起来还是不难的。那来看下内部是怎么实现的,又是怎么做到计数之后不跟CyclicBarrier一样阻塞的呢?
首先来看构造函数吧, CountDownLatch 只有 一个构造函数 ,如下
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
所做的事情也就只有初始化内部对象 sync 一件事情(校验总不能算一件事吧?),那来看下初始化了个啥玩意
// 变量sync,是不是看起来很眼熟?
private final Sync sync;
// 内部类Sync,又是一个AQS的产物
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 构造方法,就是设置了AQS的state值
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
/*
* 可以知道countDownLatch使用的是AQS的共享模式
* 获取资源方法,正数表示成功,负数表示失败
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放方法
protected boolean tryReleaseShared(int releases) {
for (;;) {
// state的状态,在countDownLatch中表示剩余计数量,如果为0则表示可以被获取,即await方法不再阻塞
int c = getState();
if (c == 0)
return false;
// 本次计数后还剩余的及数量
int nextc = c-1;
// CAS设置剩余计数量
if (compareAndSetState(c, nextc))
// ==0表示锁释放了,之后state的值将一直是0,意思就是之后的await方法都不再阻塞
return nextc == 0;
}
}
既然涉及到了 AQS ,那你应该懂我意思了—— 快去看我写的AQS文章啊 。
开个玩笑,我知道各位都多多少少了解一些,上方代码的作用应该知道是干嘛的,不懂也没关系,等下我在下面再讲。
回到正题,来讲下从上方代码能得到什么信息
1. CountDownLatch 构造函数 count 的参数作用就是 设置其内部的 AQS 的状态 state ,假设 count 为 3 ,那么每次进行 countDown , AQS 的 state 就减 1 ,减到 0 的时候 await 方法就 不再阻塞 ,注意 这时候await方法就不再阻塞了,无论你调多少次。
2. CountDownLatch 里边的 Sync 实现的 AQS 的共享模式(从 tryReleaseShared 方法可以看出)
到这里对其 CountDownLatch 的内部有个差不多印象了,接下来看下其最重要的 await 和 countDown 方法。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
直接调用了 AQS 的顶级方法,再进去就是 AQS 的模块了
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 获取资源,成功直接返回,失败执行下方方法(进入同步队列)
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
简单说明一下,这个方法的意思就是调用 tryAcquireShared 的方法尝试获取资源,方法 返回负数表示失败 , 返回正数则表示成功 ; 失败了则入同步队列 (即阻塞),具体的细节可以看下AQS的详解。
也就是说 关键点是 tryAcquireShared 方法 ,这个方法刚才在上方已经解释过,这里再放一次。方法逻辑很简单,如果 state =0(即计数完毕)则成功 ,否则失败。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
okay, await 方法的整个流程大致就是:尝试获取资源,如果 失败则阻塞 ,成功了继续当前线程的操作。 什么时候会失败呢 ,在 state !=0 的时候,而 state 这个变量的值我们在 构造函数就已经赋予 了,需要 通过 countDown 方法来减少。
既然这个方法这么重要,那让它开始它的表演吧。
public void countDown() {
sync.releaseShared(1);
}
同样的,直接调用 AQS 的顶级释放资源的方法。
public final boolean releaseShared(int arg) {
// 如果资源释放了,那么唤醒同步队列中等待的线程
if (tryReleaseShared(arg)) {
// 善后操作
doReleaseShared();
return true;
}
return false;
}
关键的方法还是在 资源的控制 上—— tryReleaseShared ,代码如下(上方也有):
protected boolean tryReleaseShared(int releases) {
for (;;) {
/*
* state的状态,在countDownLatch中表示剩余计数量
* 如果为0则表示可以被获取,即await方法不再阻塞
*/
int c = getState();
// 这里的意思是如果资源已经释放的情况下,就不能再次释放了,释放成功的代码在最后一行
if (c == 0)
return false;
// 本次计数后还剩余的及数量
int nextc = c-1;
// CAS设置剩余计数量
if (compareAndSetState(c, nextc))
// ==0表示锁释放了,之后state的值将一直是0,意思就是之后的await方法都不再阻塞
return nextc == 0;
}
}
到这里 countDown 方法的迷雾也看清了,每一次调用 countDown 方法就相当于调用 tryReleaseShared 方法, 如果当前资源还没释放的话,将state-1 ,判断是否为 0 , 如果为0的话表示资源释放 , 唤醒 await 方法的线程 ,否则的话只是更新 state 的值。
整理一下整个 CountDownLatch 的流程。
1.创建一个 CountDownLatch ,并赋予一个数值,这个值表示需要 计数的次数 ,每次 countDown 算一次
2.在 主线程 调用 await 方法,表示需要 计数器完成之前都不能动 。 await 方法的内部实现依赖于内部的 AQS ,调用 await 方法的时候会 尝试去获取资源 ,成功条件是 state =0 ,也就是说除非 countDown 了 count (构造函数赋予) 次之后,才能成功,失败的话 当前线程进行休眠 。
3.在 子线程 调用 countDown 方法,每次调用都会使内部的 state -1 , state 为 0 的时候 资源释放 , await 方法 不再阻塞(即使再次调用也是)
如果理解AQS的话,不止 CountDownLatch ,其他衍生物例如 ReentrantLock 都能轻易的看懂。如果不了解的话也没关系,这篇文章应该能让你对 CountDownLatch 的内部实现有了大概的轮廓。
简单总结一下, CountDownLatch 就三个点 :构造函数的值、 await 、 countDown 。构造函数的值表示计数的次数,每次 countDown 都会使计数减一,减到 0 的时候 await 方法所在的线程就不再阻塞。
这篇文章写得,自己都有点不好意思了...