// 下列三对操作的语义是相同的
// Condition.await() Object.wait()
// Condition.signal() Object.notify()
// Condition.signalAll() Object.notifyAll()
public class BlockedQueue<T> {
private static final int MAX_SIZE = 10;
// 可重入锁
private final Lock lock = new ReentrantLock();
// 条件变量:队列不满
private final Condition notFull = lock.newCondition();
// 条件变量:队列不空
private final Condition notEmpty = lock.newCondition();
// 队列实际存储:栈
private final Stack<T> stack = new Stack<>();
// 入队
public void enq(T t) {
// 先获得互斥锁,类似于管程中的入口
lock.lock();
try {
while (stack.size() >= MAX_SIZE) {
// 队列已满,等待队列不满,才可入队
notFull.await();
}
// 入队后,通知队列不空,可出队
stack.push(t);
notEmpty.signalAll();
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
}
// 出队
public T deq() {
// 先获得互斥锁,类似于管程中的入口
lock.lock();
try {
while (stack.isEmpty()) {
// 队列已空,等待队列不空,才可出队
notEmpty.await();
}
// 出队后,通知队列不满,可入队
T pop = stack.pop();
notFull.signalAll();
return pop;
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
return null;
}
}
在TCP协议层面,发送完RPC请求后,系统线程是不会等待RPC的响应结果的,需要RPC框架完成 异步转同步 的操作
protected Result doInvoke(final Invocation invocation) throws Throwable {
...
return (Result) currentClient
.request(inv, timeout) // 发送RPC请求,默认返回DefaultFuture
.get(); // 等待RPC返回结果
}
当RPC返回结果之前,阻塞调用线程,让调用线程等待;当RPC返回结果后,唤醒调用线程,让调用线程重新执行
// 锁和条件变量 private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); // RPC结果 private volatile Response response; // 回调 private volatile ResponseCallback callback;
// RPC结果是否已经返回
public boolean isDone() {
return response != null;
}
// 调用方通过该方法等待RPC结果
public Object get(int timeout) throws RemotingException {
...
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
...
}
return returnFromResponse();
}
// RPC结果返回时调用该方法
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signalAll();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
转载请注明出处:http://zhongmingmao.me/2019/05/07/java-concurrent-condition/
访问原文「 Java并发 -- Condition 」获取最佳阅读体验并参与讨论