private ExecutorService pool = Executors.newFixedThreadPool(500);
public void handle() throws IOException {
// 处理请求
try (ServerSocketChannel ssc = ServerSocketChannel.open().bind(new InetSocketAddress(8080))) {
while (true) {
// 接收请求
SocketChannel sc = ssc.accept();
// 将请求处理任务提交给线程池
pool.execute(() -> {
try {
// 读Socket
ByteBuffer rb = ByteBuffer.allocateDirect(1024);
sc.read(rb);
TimeUnit.SECONDS.sleep(1);
// 写Socket
ByteBuffer wb = (ByteBuffer) rb.flip();
sc.write(wb);
sc.close();
} catch (IOException | InterruptedException ignored) {
}
});
}
} finally {
pool.shutdown();
}
}
private ExecutorService pool = new ThreadPoolExecutor(50, 500, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000), // 有界队列
runnable -> new Thread(runnable, "echo-" + runnable.hashCode()), // ThreadFactory
new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
// L1、L2阶段共用线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
CountDownLatch l1 = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
System.out.println("L1");
pool.execute(() -> {
CountDownLatch l2 = new CountDownLatch(2);
for (int j = 0; j < 2; j++) {
pool.execute(() -> {
System.out.println("L2");
l2.countDown();
});
}
try {
// 线程池中的2个线程都阻塞在l2.await(),没有多余线程去执行L2阶段的任务(在线程池的任务队列中等待)
l2.await(); // line 28
} catch (InterruptedException ignored) {
}
l1.countDown();
});
}
l1.await();
// 输出
// L1
// L1
// 阻塞在l2.await() "pool-1-thread-2" #11 prio=5 os_prio=31 tid=0x00007f934e8f5000 nid=0x4303 waiting on condition [0x000070000792d000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000079609bd58> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at time.geek.worker.thread.DeadLock.lambda$deadLockTest$1(DeadLock.java:28) at time.geek.worker.thread.DeadLock$$Lambda$1/1221555852.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) // 阻塞在l2.await() "pool-1-thread-1" #10 prio=5 os_prio=31 tid=0x00007f9350142800 nid=0x3c03 waiting on condition [0x000070000782a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0000000795ff56a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at time.geek.worker.thread.DeadLock.lambda$deadLockTest$1(DeadLock.java:28) at time.geek.worker.thread.DeadLock$$Lambda$1/1221555852.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
转载请注明出处:http://zhongmingmao.me/2019/05/24/java-concurrent-worker-thread/
访问原文「 Java并发 -- Worker Thread模式 」获取最佳阅读体验并参与讨论