Semaphore(信号量)用来限制访问同一资源的线程数量。它通过初始化一个固定数量的配额,当线程要执行时,必须先获取配额才能继续执行,当获取不到时,就需要挂起等待;持有配额的线程执行完后需释放配额,并唤醒等待的线程。
使用方法
public class SemaphoreDemo { public static void main(String[] args) { Semaphore spd = new Semaphore(3); for (int i = 0; i < 9; i++) { new Thread(() -> { try { // 申请一个配额 spd.acquire(); System.out.println(Thread.currentThread().getName() + "获取到配额:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // 相关业务逻辑 Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); }finally { //释放一个配额 spd.release(); } }, "线程-" + (i+1)).start(); } } } 复制代码
执行结果如下:
线程-1获取到配额:2020-03-12 14:48:56 线程-3获取到配额:2020-03-12 14:48:56 线程-2获取到配额:2020-03-12 14:48:56 线程-5获取到配额:2020-03-12 14:48:59 线程-4获取到配额:2020-03-12 14:48:59 线程-6获取到配额:2020-03-12 14:48:59 线程-8获取到配额:2020-03-12 14:49:02 线程-9获取到配额:2020-03-12 14:49:02 线程-7获取到配额:2020-03-12 14:49:02 复制代码
我们默认初始化了3个配额,然后启动9个线程,从执行结果可以看出,由于配额的限制,执行的过程中始终只能有3个线程同时执行。
内部原理
我们通过追踪源码来研究一下Semaphore的内部原理。首先我们先是new了一个Semaphore对象,进入它的构造方法:
public class Semaphore{
...
public Semaphore(int permits) {
// 调用了内部类NonfairSync
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
// 调用了父类Sync的构造方法
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
Sync(int permits) {
// 父类AbstractQueuedSynchronizer实现
setState(permits);
}
}
}
复制代码
可以看出,最终是调用了内部类Sync的构造方法,setState方法如下:
public abstract class AbstractQueuedSynchronizer
protected final void setState(int newState) {
// 初始值赋给同步状态state
state = newState;
}
}
复制代码
从上面的逻辑可以看出,Semaphore内部是基于AQS( AQS实现原理
)实现的,初始化时只是给AQS的同步状态赋值。
下面我们看下申请配额acquire方法:
// Semaphore类
public void acquire() throws InterruptedException {
// 这边调用AbstractQueuedSynchronizer中方法
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer类
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared获取共享锁,由子类实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
复制代码
tryAcquireShared方法由子类实现,我们进入NonfairSync:
static final class NonfairSync extends Sync { ... protected int tryAcquireShared(int acquires) { // 获取非公平锁,父类Sync中实现 return nonfairTryAcquireShared(acquires); } } // 父类Sync final int nonfairTryAcquireShared(int acquires) { for (;;) { // 读取可用的配额 int available = getState(); // 可用配额减去申请的数量 int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 返回剩余的配额数 return remaining; } } 复制代码
这边的逻辑是,先查看剩余配额,若不满足申请的配额时,返回一个负值;满足时,则会返回大于等于0的整数。我们接着回到上面申请配额的代码:
// AbstractQueuedSynchronizer类 public final void acquireSharedInterruptibly(int arg)throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 没申请到配额 doAcquireSharedInterruptibly(arg); } 复制代码
从这边可以看出,如果申请到了配额,线程就继续执行,如果没申请到,就进入doAcquireSharedInterruptibly方法:
// AbstractQueuedSynchronizer类
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// 线程挂起
LockSupport.park(this);
return Thread.interrupted();
}
复制代码
可以看出没申请到配额的线程,会被挂起。
到目前为止,acquire方法就分析完了,最后来我们看下release方法的实现:
// Semaphore类 public void release() { // 父类AbstractQueuedSynchronizer实现 sync.releaseShared(1); } // AbstractQueuedSynchronizer类 public final boolean releaseShared(int arg) { // tryReleaseShared:释放配额,由子类实现 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } // Sync类 protected final boolean tryReleaseShared(int releases) { for (;;) { // 获取剩余配额 int current = getState(); // 剩余配额加上释放的配额 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); // CAS修改配额数 if (compareAndSetState(current, next)) return true; } } 复制代码
上面方法可以看出,如果成功释放了配额,tryReleaseShared方法返回true,接着会进入doReleaseShared方法:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //唤醒线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } } // 唤醒线程 private void unparkSuccessor(Node node) { ... if (s != null) LockSupport.unpark(s.thread); } 复制代码
这段代码就是唤醒阻塞队列中的线程,从而那些挂起的线程可以重新申请配额进行执行。
原文
https://juejin.im/post/5e69e8d1e51d4526fc74b0c4
本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » Java并发工具类-Semaphore解析