// 存在未对账订单
while (existUnreconciledOrders()) {
// 查询未对账订单
pOrder = getPOrder();
// 查询派送订单
dOrder = getDOrder();
// 执行对账操作
Order diff = check(pOrder, dOrder);
// 将差异写入差异库
save(diff);
}
getPOrder()和getDOrder()最为耗时,并且两个操作没有先后顺序的依赖,可以 并行处理
// 存在未对账订单
// 存在未对账订单
while (existUnreconciledOrders()) {
// 查询未对账订单
Thread t1 = new Thread(() -> {
pOrder = getPOrder();
});
t1.start();
// 查询派送订单
Thread t2 = new Thread(() -> {
dOrder = getDOrder();
});
t2.start();
// 等待t1和t2结束
t1.join();
t2.join();
// 执行对账操作
Order diff = check(pOrder, dOrder);
// 将差异写入差异库
save(diff);
}
while循环里每次都会创建新的线程,而创建线程是一个 耗时 的操作,可以考虑 线程池 来优化
Executor executor = Executors.newFixedThreadPool(2);
// 存在未对账订单
while (existUnreconciledOrders()) {
// 查询未对账订单
executor.execute(() -> {
pOrder = getPOrder();
});
// 查询派送订单
executor.execute(() -> {
dOrder = getDOrder();
});
// 采用线程池方案,线程根本就不会退出,join()已经失效
// 如何实现等待??
// 执行对账操作
Order diff = check(pOrder, dOrder);
// 将差异写入差异库
save(diff);
}
Executor executor = Executors.newFixedThreadPool(2);
// 存在未对账订单
while (existUnreconciledOrders()) {
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(() -> {
pOrder = getPOrder();
latch.countDown();
});
// 查询派送订单
executor.execute(() -> {
dOrder = getDOrder();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
Order diff = check(pOrder, dOrder);
// 将差异写入差异库
save(diff);
}
// 订单队列
private Vector<Order> pos;
// 派送单队列
private Vector<Order> dos;
// 执行回调的线程池
private Executor executor = Executors.newFixedThreadPool(1);
// 传入回调函数
private final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
executor.execute(this::check);
});
// 回调函数
private void check() {
Order p = pos.remove(0);
Order d = dos.remove(0);
// 执行对账操作
Order diff = check(p, d);
// 差异写入差异库
save(diff);
}
// 两个查询操作
private void getOrders() {
Thread t1 = new Thread(() -> {
// 循环查询订单库
while (existUnreconciledOrders()) {
pos.add(getDOrder());
try {
// 等待
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
t1.start();
Thread t2 = new Thread(() -> {
// 循环查询派单库
while (existUnreconciledOrders()) {
dos.add(getDOrder());
try {
// 等待
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
t2.start();
}
转载请注明出处:http://zhongmingmao.me/2019/05/11/java-concurrent-countdown-latch-cyclic-barrier/
访问原文「 Java并发 -- CountDownLatch + CyclicBarrier 」获取最佳阅读体验并参与讨论