深入浅出Semaphore源码解析

Semaphore
通过permits的值来限制线程访问临界资源的总数,属于有限制次数的共享,不支持重入。

前提条件

在理解 Semaphore
时需要具备一些基本的知识:

理解AQS的实现原理

之前有写过一篇 《深入浅出AQS源码解析
关于AQS的文章,对AQS原理不了解的同学可以先看一下

Semaphore源码解析

Semaphore
中有3个内部类,分别是 Sync
NonfairSync
FairSync
Sync
NonfairSync
FairSync
的抽象类,我们会从解读 Semaphore
实现的功能开始入手逐渐去解析 Sync
NonfairSync
FairSync
的源码

public class Semaphore implements java.io.Serializable {

    private final Sync sync;

    /**
     * 初始化permits个资源
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 初始化permits个资源,根据fair来决定是使用公平锁还是非公平锁的方式
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 中断方式获取一个资源
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 非中断方式获取一个资源
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 尝试获取一个资源
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 尝试超时获取一个资源
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放一个资源
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * 中断方式获取permits个资源
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * 非中断方式获取permits个资源
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * 尝试获取permits个资源
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 尝试超时获取permits个资源
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * 释放permits个资源
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * 获取当前可用资源数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 用掉所有的资源,并返回用掉的资源数量
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 缩减reduction个资源
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
}

虽然 Semaphore
中的方法比较多,但是都比较简单,都是转调用 Sync
中的方法,通过解析 Sync
中的源码来帮助大家理解这些方法是如何实现的

Sync类源码解析

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 获取所有可用的资源数量
    final int getPermits() {
        return getState();
    }
    // 非公平的方式尝试获取acquires个可用的资源
    final int nonfairTryAcquireShared(int acquires) {
        // 无限循环,尝试获取acquires个资源
        // 如果资源数量不够,返回剩余资源数量
        // 如果资源数量足够且获取成功,返回剩余的资源数量
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 尝试获取releases个资源
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            // 当releases不允许为负数
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS操作尝试修改state的值
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 缩减releases个资源
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            // 当releases不允许为负数,也就时不能通过该方法增加资源
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // CAS操作尝试修改state的值
            if (compareAndSetState(current, next))
                return;
        }
    }

    // 清空所有的资源数量
    final int drainPermits() {
        for (;;) {
            int current = getState();
            // CAS操作尝试将资源数量设置为0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

FairSync类源码解析

FairSync
中的源码很简单,直接上代码

static final class FairSync extends Sync {

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        /**
         * 具体思路如下:
         * 1、如果AQS的同步队列中有等待的线程,直接获取失败,会加入到AQS的同步队列中
         * 2、如果AQS的同步队列为空,尝试修改state的值来获取acquires个资源
         * 3、一直重复步骤1和2,直到有结果返回才退出无限循环
         */
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

NonfairSync类源码解析

NonfairSync
中的源码就更简单,解析如下:

static final class NonfairSync extends Sync {

    NonfairSync(int permits) {
        super(permits);
    }
    
    // 抢占式的获取acquires个资源
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

总结

  • permits
    初始化为 1
    时, Semaphore
    变成了一个互斥的排他锁
  • permits
    初始化为无穷大时, Semaphore
    变成了无锁模式
  • state
    的值为 0
    的时候,无法获取资源,获取资源的线程会进入AQS的同步队列等待有资源释放时被唤醒
  • Semaphore
    初始化成 非公平锁
    时,可能会出现有的线程饿死的情况,一般对于控制资源的使用而言,建议初始化为 公平锁
  • 可以调用 reducePermits
    动态的缩减资源的数量,但是不能增加资源的数量

原文 

http://www.cnblogs.com/pinxiong/p/13332609.html

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » 深入浅出Semaphore源码解析

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址