构建状态依赖的类最简单的办法就是利用已有的状态依赖的类。比如使用 CountDownLatch
,或者自己构建同步器。自己构建同步器可以利用:内置条件队列(intrinsic condition queues)、显式 Condition
对象、 AbstractQueuedSynchronizer
框架。
使用polling(轮询)和sleeping来出来状态依赖是很痛苦的。所以不推荐。
阻塞的状态依赖动作的结构:
acquire lock on object state
while (precondition does not hold) {
release lock
wait until precondition might hold
optionally fail if interrupted or timeout expires
reacquire lock
}
perform action
release lock
下面是一个有届的Buffer的基类,用的是循环数组,后面有些例子会用它来说明:
@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
@GuardedBy("this") private final V[] buf;
@GuardedBy("this") private int tail;
@GuardedBy("this") private int head;
@GuardedBy("this") private int count;
protected BaseBoundedBuffer(int capacity) {
this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length)
tail = 0;
++count;
}
protected synchronized final V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length)
head = 0;
--count;
return v;
}
public synchronized final boolean isFull() {
return count == buf.length;
}
public synchronized final boolean isEmpty() {
return count == 0;
}
}
下面这个例子不要的地方在于:
@ThreadSafe
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
public GrumpyBoundedBuffer(int size) {
super(size);
}
public synchronized void put(V v) throws BufferFullException {
if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {
if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}
class ExampleUsage {
private GrumpyBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
void useBuffer() throws InterruptedException {
while (true) {
try {
String item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
}
class BufferFullException extends RuntimeException {
}
class BufferEmptyException extends RuntimeException {
}
不用异常也可以,比如通过返回一个Error结果,但是问题的本质没有变
这个例子稍微好点,自己处理前置条件失败:
@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {
int SLEEP_GRANULARITY = 60;
public SleepyBoundedBuffer() {
this(100);
}
public SleepyBoundedBuffer(int size) {
super(size);
}
public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty())
return doTake();
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}
不过问题也差不多:睡眠的粒度很难把握,短了浪费CPU,长了增加响应度,
起名为Condition queue是因为它给一组线程——称为wait set——等待特定的条件变为true。这个队列中的元素是等待条件的线程。
每个对象可以作为condition queue, Object
的 wait
、 notify
、 notifyAll
方法构成了内置条件队列的API。
对象内置锁和内置条件队列是关联的:
下面这个例子更简单,也更高效,响应度更高。
@ThreadSafe
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {
this(100);
}
public BoundedBuffer(int size) {
super(size);
}
// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull())
wait();
doPut(v);
notifyAll();
}
// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty())
wait();
V v = doTake();
notifyAll();
return v;
}
// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
// 减少notifyAll的次数
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
}
take
的condition predicate是buffer不为空、 put
的则是buffer没有满
condition wait牵涉三个:加锁、 wait
方法、被锁保护的状态变量。测试condition predicate之前要得到锁,锁对象和条件队列对象得是同一个。
wait
方法:释放锁,阻塞当前线程,等待——直到或超过规定时间、或线程被中断、或线程被通知唤醒。线程被唤醒后和其他线程再次争抢锁。
从 wait
中醒来不代表condition predicate变成true了,所以醒过来后一定要判断条件。
一个内置条件队列可能被多个condition predicate使用,因此如果有人调用了 notifyAll
不代表你等待的condition predicate变成true了。而且 wait
还会虚假的醒来,即并没有人调用 notify
。
下面是经典用法:
void stateDependentMethod() throws InterruptedException {
synchronized(lock) {
while (!conditionPredicate()) {
lock.wait();
}
// object is no in desired state
}
}
使用 Object.wait
或 Condition.await
要记牢:
wait
之前,和从 wait
唤醒之后要测试condition predicate wait
wait
、 notify
、 notifyAll
之前要持有锁 漏掉信号发生在:一个线程必须等待一个已经是true的条件,但是在等待之前没能检查condition predicate。
如果A 先 notify
,而B在后面 wait
,那么B是不会得到A发出的信号的,所以在 wait
之前一定要检查condition predicate。用前面的代码结构就能解决这个问题。
如果你在等待一个条件,那么确保肯定有其他人会在条件变成true的时候发出通知。
通知也需要得到锁,所以而且通知线程释放锁越快越好。大部分情况下要用 notifyAll
而不是 notify
,因为 notify
只通知一个线程,如果这个线程等待的condition没有变成true,那么这次通知就浪费了,那么其他线程就没有机会了,也就是这个信号被劫持了。
notify
可以作为性能优化的手段,但还是那句老话,先做对再做好。
下面是一个gate例子,gate关闭的时候线程等待,gate打开的时候线程通过:
@ThreadSafe
public class ThreadGate {
// CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
@GuardedBy("this") private boolean isOpen;
@GuardedBy("this") private int generation;
public synchronized void close() {
isOpen = false;
}
public synchronized void open() {
++generation;
isOpen = true;
notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation)
wait();
}
}
解释一下这段:
while (!isOpen && arrivalGeneration == generation) wait();
如果大门关闭 且 线程进入时的代数和门的代数一样,就要等待。反过来的意思是,如果大门开放,或者线程进入时的代数比门的代数更老,则通过。为什么?
因为在从 wait
唤醒到再次进入while之间门可能会被关闭,如果只看 open
状态,那么有一部分线程就会通不过,这个是有问题的,因为Gate设计的本来意是如果大门打开,那么就同时释放。
一个依赖状态的类应该要么完全暴露(并文档)它的等待和通知协议给子类,要么压根防止子类参与进来。
最好封装条件队列,这样在类层级外部就不会访问到它。
略。
Condition
对象关联一个 Lock
对象,一个 Lock
对象可以有很多 Condition
对象。 Condition
对象继承了 Lock
对象的公平性。
下面是 Condition
接口:
void await() boolean await(long time, TimeUnit unit) long awaitNanos(long nanosTimeout) void awaitUninterruptibly() boolean awaitUntil(Date deadline) void signal() void signalAll()
下面是一个例子:
@ThreadSafe
public class ConditionBoundedBuffer <T> {
protected final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// CONDITION PREDICATE: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();
private static final int BUFFER_SIZE = 100;
@GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
@GuardedBy("lock") private int tail, head, count;
// BLOCKS-UNTIL: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
像 ReentrantLock
、 Semaphore
、 CountDownLatch
、 FutureTask
,都是利用了 AbstractQueuedSynchronizer
(AQS)。
基于AQS的同步器执行的操作是 acquire
和 release
的变种:
AQS管理同步器的状态,通过 getState
、 setState
、 compareAndSetState
方法。比如:
ReentrantLock Semaphore FutureTask
同步器也可以保存其他状态,比入 ReentrantLock
保存了lock owner线程,用来确保只有owner可以释放锁。
acquire
可能是独占的(exclusive),比如 ReentrantLock
。也可以是非独占的(non-exclusive),比如 Sempaphore
和 CountDownLatch
。
一次 acquire
的包含两个部分:
第一部分
第二部分:更新同步器状态。
acquire
和 release
的经典形式:
boolean acquire() throws InterruptedException {
while (state does not permit acquire) {
if (blocking acquisition requested) {
enqueue current thread if not already queued
block current thread
}
else
return failure
}
possibly update synchronization state
dequeue thread if it was queued
return success
}
void release() {
update synchronization state
if (new state may permit a blocked thread to acquire)
unblock one or more queued threads
}
实现AQS:
tryAcquire
、 tryRelease
、 isheldExclusively
方法。 tryAcquireShared
、 tryReleaseShared
。 acquire
、 acquireShared
、 release
、 releaseShared
会调用上面的 try*
方法。
tryAcquireShared
方法返回值说明:
tryRelease
和 tryReleaseShared
返回值说明:
@ThreadSafe
public class OneShotLatch {
private final Sync sync = new Sync();
public void signal() {
sync.releaseShared(0);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int ignored) {
// Succeed if latch is open (state == 1), else fail
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {
setState(1); // Latch is now open
return true; // Other threads may now be able to acquire
}
}
}
一般来说不会直接继承AQS,而是弄一个私有内部类来继承,这样可以保证封装。
protected boolean tryAcquire(int ignored) {
find Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, 1)) { // 这里的整段代码不是同步的
owner = current; // 没成功说明被别的线程抢了,会跑到最后的return false
return true;
}
} else if (current == owner) {
setState(c + 1);
return true
}
return false;
}
在Semaphore中的运用:
protected int tryAcquireShared(int acquires) {
while (true) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
/* > 0: 我拿别人也能拿,非独占
= 0: 我拿别人拿不了,独占
< 0: 谁都拿不了
*/
return remaining;
}
}
protected boolean tryReleaseShared(int releases) {
while (true) {
int p = getState();
if (compareAndSetState(p, p + releases))
return true;
}
}
在CountDownLatch中的运用:
无
state保存任务状态:running、completed、cancelled,同时保存计算结果或抛出的异常,还维护了一个运行这个任务的线程(为了能够cancel)
同时使用了shared和非shared两种方法。
AQS内部维护了一个等待线程的队列,跟踪每个线程是请求独占还是非独占。在ReentrantReadWriteLock里,当锁可用式,如果队列头的线程请求写锁,它会得到它。如果队列头线程请求读锁,则会释放它和后面的线程,直到碰到一个请求写锁的线程。