观察这些类的源码,其实可以总结出一个应用 aqs 的 模式 。 下面用自定义实现一个 MutexLock 来演示这个模式。
public class MutexLock{
/* 1.
定义一个继承自aqs的静态内部类Sync
该类去覆盖实现tryAcquire/tryRelease
或者tryAcquireShared/tryReleasedShared
这里是独占式的互斥锁,因此只用实现前者
若是Semaphore或者CountdownLatch则只用实现后者
*/
private static class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if(arg != 1)
throw new IllegalArgumentException;
if(compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if(arg != 1)
throw new IllegalArgumentException;
if(getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
final ConditionObject newCondition(){
return new ConditionObject();
}
}
/*2. 类内部持有一个Sync对象*/
private final Sync sync;
public MutexLock() { this.sync = new Sync(); }
/*
3.
根据类的每个方法的具体功能分别去调用Sync的acquire/release/try
*/
@Override
public void lock() { sync.acquire(1); }
@Override
public void unlock() { sync.release(1); }
}
复制代码
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
多线程计算,最后主线程将所有计算结果汇总,再计算。也就是主线程一定要保证上一步的计算中,每个线程都计算完毕,都执行完毕。有点类似join,不过是等待所有线程都结束,而不是某个特定的线程。
count--
CountdownLatch维护的Sync对像有了新的构造函数。state可以为int范围内的任意数。
Sync(int count) {
setState(count);
}
复制代码
CountdownLatch维护的Sync对象重写了aqs的共享式尝试获取同步状态的两个方法: tryAcquireShared tryReleaseShared
//只有当state也就是countDownLatch中的count减少到0时
//才会返回正数
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
具体到 await方法 和 countDown 方法:
//选择使用可中断的acquire方法
//与aqs的acquire类似
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//每次只释放一个permit,也就是state只会减少1
public void countDown() {
sync.releaseShared(1);
}
复制代码
A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each { acquire } blocks if necessary until a permit is available, and then takes it. Each { release } adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the { Semaphore } just keeps a count of the number available and acts accordingly.
控制并发访问的线程个数,相当于操作系统里的信号量.
维护了两个 aqs 子类对象, FairSync , NonFairSyn , 它们都继承自 Sync 。他们共享父类的 tryReleaseShared 方法:
protected final boolean tryReleaseShared(int releases) {
//经典的CAS死循环,类比原子类的getAndIncrement()
//保证多个线程同时释放对象的线程安全
for (;;) {
int current = getState();
//归还releases数量的permits到state中
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
复制代码
公平与非公平主要是判断对锁 新的申请 , 是不是要排队 。所以他们都各自重写了 aqs 的 tryAcquireShared 方法。
protected int tryAcquireShared(int acquires) {
for (;;) {
/*
如果当前aqs维护的CLH队列中有在等待的节点
并且这个节点的下一个节点的线程不是当前线程
那么会直接失败,失败后会怎样呢?
参考acquireShared的源码
尝试失败后会直接加入队列的尾部(addWaiter())
*/
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
//父类Sync中的方法
final int nonfairTryAcquireShared(int acquires) {
//还是经典的CAS循环
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
复制代码
NonFair版本中调用了父类中的 nonfairTryAcquireShared ,这个方法和 Fair 版本的 唯一区别 就是少了对 hasQueuedPredecessors() 的判断。 也就是说它会不断的通过CAS操作去设置 state ,一旦设置成功, remaining >= 0 时,它会直接忽略CLH队列中等待的节点, 优先 的进入临界区。 设置失败后( remaining < 0 ),那么还是和 Fair 版本的一样,也得 乖乖的去排队 。
那么具体到 Semaphore 的核心方法上:
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void release() {
sync.releaseShared(1);
}
复制代码
该锁对比文章开头自定义的MutexLok, 增加了了公平与非公平锁的功能,还实现了 可重入性 。 关于公平与非公平,上面的 Semaphore 已经提到过,只是 tryAcquire 方法中有没有对 hasQueuedPredecessors 加以判断。 那么我们来重点关注一下它的 可重入性 。
可重入性,也就是同一个线程在已经获取锁的情况下,可以多次的再次获取锁。 不可重入的锁,在递归调用的方法上加锁,就会出现死锁。 因此出现了可重入锁。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前没有任何线程获取该锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//该锁已经被某个线程获得
//判断这个线程是不是当前线程,如果是,那么可以再次的获取锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
复制代码
读写分离的、 可重入 的锁。 使用它可以提高并发的 吞吐量 ,提高了 读 取操作的效率。
将aqs中的 int 型变量 state 划分成了两个十六位的 unsigned short 。
同时写了两个快捷获取读锁/写锁状态的函数: sharedCount(int c) 和 exclusiveCount(int c) 。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
先来看看 读锁的获取 。
//传入的int型参数,没有被这个函数用过
//虽然unused, 但为了维持这个接口还是不改变函数签名
protected final int tryAcquireShared(int unused) {
/*
* 1. 如果其他线程持有写锁,失败(返回小于0的值)
* 2. 否则,这个线程是一个可以去锁定写锁状态的线程
* 那么去看看它由于排队的策略
* 以及避免饥饿的启发式算法,它是否应该阻塞。
* 重入性请求被延迟到full version处理。
* 3. 如果第二步失败,执行full version
*/
Thread current = Thread.currentThread();
int c = getState();
//其他线程持有写锁,直接返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
/*
去往full version 的三个条件:
1. 根据启发式的避免饥饿算法,当前读者应当阻塞
2. 当前读者数量超过上限
3. CAS设置失败
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//ThreadLocal线程封闭,让每个线程可以获取当前获得锁的数量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
复制代码
读锁的释放相对简单一些。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//HoldCount相关操作
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//释放锁的核心操作CAS循环
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
/* 读锁的释放不会对读者产生影响
但它也许会唤醒写者 */
return nextc == 0;
}
}
复制代码
写锁的获取。
protected final boolean tryAcquire(int acquires) {
/*
1. 读锁不为空,或者写锁不为空并且持有线程不是当前线程,fail
2. 写锁饱和,fail.(发生原因:重入次数过多,超过1 << 16 - 1.)
3. 否则,它有机会去获取写锁
*/
Thread current = Thread.currentThread();
int c = getState();
//w -> 写锁数量
int w = exclusiveCount(c);
//state != 0 -> 必定存在写锁/或者读锁被某个线程持有
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//state == 0 -> 没有任何线程持有读锁/写锁
//如果非公平锁,writerShouldBlock()始终返回false
//公平锁,writerShouldBlock()会调用hasQueuedPredecessors()作为返回值
//如果当前写者不用阻塞那么就尝试去CAS设置state
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
复制代码
protected final boolean tryRelease(int releases) {
//如果当前线程不是独占的获取了锁
//就没有资格释放锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
复制代码
关于 公平性 ,还有 写者饥饿 的问题要具体看NonFairSync和FairSync对读者/写者应否阻塞的控制。 NonFair版本: 写者优先 。
static final class NonfairSync extends Sync {
final boolean writerShouldBlock() {
return false; // 写者总是插队
}
final boolean readerShouldBlock() {
/*
启发式的避免饥饿的方法
*/
return apparentlyFirstQueuedIsExclusive();
}
}
//父类aqs中的方法
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
/*
返回为假的四种情况
1. 队列中首节点为空 -> 就是没有等待的节点,那么不需要阻塞读者线程
2. 首节点的下一个节点为空 -> 只有一个等待的节点,那么也不需要阻塞
3. 首节点的下一个节点是共享模式 -> 队列中还有未出队的读者线程
4. 下一个节点中没有线程或者被中断
*/
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
复制代码
Fair版本: 读者和写者公平排队。
static final class FairSync extends Sync {
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
复制代码
在《Java 并发编程的艺术》一书中说道:
如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。
并且书中是用数据可见性来解释锁降级的必要性的。 而得到这个结论的依据是 “如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程 T)获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新”。 那么就有一个问题: 如果另一个线程获取了写锁(并修改了数据),那么这个锁就被独占了,没有任何其他线程可以读到数据,谈什么 “感知数据更新”?
我的理解是,锁降级只是一种 特殊的重入机制 。相比释放写锁后再获取读锁这样的 手动降级 ,有两个最明显的优点。
Condition其实是一个 接口 。
aqs中的 内部类ConditionObject 实现了这个接口。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
//fullyRelease(node)将所有的state释放掉
//相当于隐式的释放掉所有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//当线程已经转移到Sync队列上以后正常的去尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
唤醒线程:其实把线程从 condition queue 转移到了 sync queue 。
/** * 将节点从condition queue 转移到 sync queue中。 * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node); 复制代码
并且 Lock 接口定义了一个 newCondition() 方法,规定所有的锁要提供一个获取与这个锁对应的Condition对象。 ReentrantLock的实现中,在同步器Sync中增加了一个 newCondition 方法, 该方法新建一个ConditionObject对象返回。 注意,这个ConditionObject是aqs的一个 内部类 ,通过这个ConditionObject可以访问aqs的 任何成员以及方法 。 这也是使用 内部类 一个最大的 优点 。
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
和CountdownLatch比较类似,但CyclicBarrier更加注重的是集合内的线程同步,线程组内的所有线程都必须等待组内其他线程运行到一个 barrier point ,才能继续执行。能够处理更加 复杂的场景 。并且CyclicBarrier内有一个Generation对象,可以 一代一代的重用 下去。
CyclicBarrier维护了一个ReentrantLock对象 lock ,以及一个由该对象生成的Condition对象 trip 。 通过这两个对象可以保证对 count 操作的线程安全,可以控制 线程之间 的 阻塞与唤醒 。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
复制代码
该方法直接调用了核心代码dowait()方法,并且指定false, 也就是不支持超时返回,时间为0L(unused)。
/**
* 主要的barrier代码,包含了各种策略
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//lock保证对CyclicBarrier对象的线程安全
lock.lock();
//这个try块内容非常多
//包括count--操作以及循环阻塞的过程
try {
//中断或者barrier损坏,抛出对应异常
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//没有损坏则将count--
//因为加了锁,所以不用担心count数据线程不安全
//index -> 可以推断到达barrier的先后次序
int index = --count;
//index == 0 -> 这是最后到达barrier的线程
// -> tripped
if (index == 0) { // tripped
//根据初始化的参数决定是否执行barrierCommand
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//command != null
//初始化时已经设置了barrierCommand,需要在此处执行
if (command != null)
command.run();
ranAction = true;
/*
//这一个generation结束,开始新的一轮计数同步
//并且生产一个新的Generation对象
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
*/
nextGeneration();
//dowait()返回
//意味着所有线程可以接着执行下面的代码了
return 0;
} finally {
//try块中的ranAction没有得到执行
//就自动将这个barrier中断,可能是因为
//command.run() 被中断或者异常
if (!ranAction)
breakBarrier();
}
}
//循环直到所有线程到达barrier
//或者barrier被中断,或者超时
for (;;) {
try {
//如果没有时间限制
//无限等待
//trip是一个Condition接口的对象
if (!timed)
trip.await();
//有时间限制,超时等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
}
//相应中断相关操作
catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
//在执行过nextGeneration后
//这个全局的generation对象被赋予了新的值
//当前线程接收到nextGeneration中的signalAll()方法后
//返回当初记录它来到barrier的次序
if (g != generation)
return index;
//超时,中断barrier,抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
复制代码
看完这段代码,似乎还是有一个 不解之问 。就是 dowait() 方法的代码都是在 临界区 里执行的,包括 for 循环。 而我们的功能要求我们应该有多个线程并发的执行 dowait 方法,但是,当第一个线程 dowait 后,就 上锁 了,并且直到中断或者tripped才 释放锁 。 那么其他的线程根本就访问不了CyclicBarrier这个对象了,更别提一起到达 barrier ,将 count 减少到0了。
其实在for循环开始时,执行 trip.await() 时,会去执行ConditionObject里的 await() 方法,这个方法会首先执行一个 fullyRelease(Node node) 函数, 将 state 尝试置为0。就相当于是 隐式 的 释放 掉了当前 所有 的锁了。所以后续的线程调用 dowait() 方法,不会被锁在 临界区 外。因为一旦有线程 trip.wait() , 锁的状态就会清零 。