public class FileBrowser {
// 发送消息
private void send(Message message) {
}
// MQ消息返回后调用该方法
public void onMessage(Message message) {
}
public Response handleWebReq() {
Message message = new Message(1L, "123");
// 发送消息
send(message);
// 如何等待MQ返回消息?
return new Response();
}
}
@AllArgsConstructor
class Message {
private Long id;
private String content;
}
class Response {
}
public class GuardedObject<T> {
private static final int TIMEOUT = 1;
// 受保护对象
private T obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
// 获取受保护对象
public T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while (!p.test(obj)) {
done.await(TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}
// 事件通知方法
public void onChange(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
public class GuardedObject<T> {
private static final int TIMEOUT = 1;
// 保存所有的GuardedObject
private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>();
// 受保护对象
private T obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static <K> GuardedObject create(K key) {
GuardedObject go = new GuardedObject();
goMap.put(key, go);
return go;
}
public static <K, T> void fireEvent(K key, T obj) {
GuardedObject go = goMap.remove(key);
if (go != null) {
go.onChange(obj);
}
}
// 获取受保护对象
public T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while (!p.test(obj)) {
done.await(TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return obj;
}
// 事件通知方法
public void onChange(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
public class FileBrowser {
// 发送消息
private void send(Message message) {
}
// MQ消息返回后调用该方法
public void onMessage(Message message) {
// 唤醒等待的线程
GuardedObject.fireEvent(message.getId(), message);
}
public Response handleWebReq() {
Long id = 1L;
Message message = new Message(id, "123");
GuardedObject go = GuardedObject.create(id);
// 发送消息
send(message);
// 等待MQ消息
go.get(Objects::nonNull);
return new Response();
}
}
@Data
@AllArgsConstructor
class Message {
private Long id;
private String content;
}
class Response {
}