转载

Java多线程-并发工具类Semaphore详解

Semaphore是一种同步辅助工具,翻译过来就是信号量,用来实现流量控制,它可以控制同一时间内对资源的访问次数.

无论是Synchroniezd还是ReentrantLock,一次都只允许一个线程访问一个资源,但是Semaphore可以指定多个线程同时访问某一个资源.

Semaphore有一个构造函数,可以传入一个int型整数n,表示某段代码最多只有n个线程可以访问,如果超出了n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。

信号量上定义两种操作:

  • acquire(获取):当一个线程调用acquire操作时,它要么成功获取到信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时,Semaphore内部会维护一个等待队列用于存储这些被暂停的线程.
  • release(释放)实际上会将信号量的值+1,然后唤醒相应Sepmaphore实例的等待队列中的一个任意等待线程.

应用场景

信号量主要用于两个目的:

  • 用于多个共享资源的互斥使用.
  • 用于并发线程数的控制.

例子

以下的例子:5个线程抢3个车位,同时最多只有3个线程能抢到车位,等其他线程释放信号量后,才能抢到车位.

public static void main(String[] args) {
		Semaphore semaphore = new Semaphore(3);

		for (int i = 0; i < 5; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						semaphore.acquire();//申请资源
						System.out.println(Thread.currentThread().getName()+"抢到车位");
						ThreadUtil.sleep(RandomUtil.randomInt(1000,5000));
						System.out.println(Thread.currentThread().getName()+"归还车位");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}finally {
					    //释放资源
						semaphore.release();
					}

				}
			},"线程"+i).start();
		}
	}
复制代码

注意事项

  • Semaphore.acquire()和Semaphore.release()总是配对使用的,这点需要由应用代码自身保证.
  • Semaphore.release()调用应该放在finally块中,已避免应用代码出现异常的情况下,当前线程所获得的信号量无法返还.
  • 如果Semaphore构造器中的参数permits值设置为1,所创建的Semaphore相当于一个互斥锁.与其他互斥锁不同的是,这种互斥锁允许一个线程释放另外一个线程所持有的锁.因为一个线程可以在未执行过Semaphore.acquire()的情况下执行相应的Semaphore.release().
  • 默认情况下,Semaphore采用的是非公平性调度策略.

原理

abstract static class Sync extends AbstractQueuedSynchronizer {
    //省略
 }
复制代码

Semaphore内部使用Sync类,Sync又是继承AbstractQueuedSynchronizer,所以Sync底层还是使用AQS实现的.Sync有两个实现类NonfairSync和FairSync,用来指定获取信号量时是否采用公平策略.

初始化方法

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}


public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
    setState(permits);
}
复制代码

如上所示,Semaphore默认采用非公平策略,如果需要使用公平策略则可以使用带两个参数的构造函数来构造Semaphore对象。

参数permits被传递给AQS的state值,用来表示当前持有的信号量个数.

void acquire()方法

当前线程调用该方法的目的是希望获取一个信号量资源。

如果当前信号量个数大于0,则当前信号量的计数会减1,然后该方法直接返回。否则如果当前信号量个数等0,则当前线程会被放入AQS的阻塞队列。当其他线程调用了当前线程的interrupt()方法中断了当前线程时,则当前线程会抛出InterruptedException异常返回。

//Semaphore方法
public void acquire() throws InterruptedException {
    //传递参数为1,说明要获取1个信号量资源
    sync.acquireSharedInterruptibly(1);
}

//AQS的方法
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //(1)如果线程被中断,则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //(2)否则调用Sync子类方法尝试获取,这里根据构造函数确定使用公平策略
    if (tryAcquireShared(arg) < 0)
        //如果获取失败则放入阻塞队列.然后再次尝试,如果使用则调用park方法挂起当前线程
        doAcquireSharedInterruptibly(arg);
}
复制代码

由如上代码可知,acquire()在内部调用了Sync的acquireSharedlnterruptibly方法,后者会对中断进行响应(如果当前线程被中断,则抛出中断异常)。尝试获取信号量资源的AQS的方法 tryAcquireShared是由Sync的子类实现的,所以这里分别从两 方面来讨论。

先讨论非公平策略NonfairSync类的tryAcquireShared方法,代码如下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取当前信号量值
        int available = getState();
        //计算当前剩余值
        int remaining = available - acquires;
        //如果当前剩余值小于0或则CAS设置成功则返回
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
复制代码

如上代码先获取当前信号量值(available),然后减去需要获取的值(acquires),得到剩余的信号量个数(remaining),如果剩余值小于0则说明当前信号量个数满足不了需求,那么直接返回负数,这时当前线程会被放入AQS的阻塞队列而被挂起。如果剩余值大于0,则使用CAS操作设置当前信号量值为剩余值,然后返回剩余值。

另外,由于NonFairSync是非公平获取的,也就是说先调用aquire方法获取信号量的线程不一定比后来者先获取到信号量。

考虑下面场景,如果线程A先调用了aquire()方法获取信号量,但是当前信号量个数为0,那么线程A会被放入AQS的阻塞队列 。过一段时间后线程C调用了release()方法释放了一个信号量,如果当前没有其他线程获取信号量,那么线程A就会被激活,然后获取该信号量,但是假如线程C释放信号量后,线程C调用了aquire方法,那么线程C就会和线程A去竞争这个信号量资源 。 如果采用非公平策略,由nonfairTryAcquireShared的代码可知,线程C完全可以在线程A被激活前,或者激活后先于线程 A获取到该信号量,也就是在这种模式下阻塞线程和当前请求的线程是竞争关系,而不遵循先来先得的策略。

下面看公平性的FairSync类是如何保证公平性的。

protected int tryAcquireShared(int acquires) {
    for (;;) {
        //查询是否当前线程节点的前驱节点也在等待获取该资源,有的话直接返回
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
复制代码

可见公平性还是靠hasQueuedPredecessors这个函数来保证的。所以Semaphore的公平策略是看当前线程节点的前驱节点是否也在等待获取该资源,如果是则自己放弃获取的权限,然后当前线程会被放入AQS阻塞队列,否则就去获取。

void acquire(int permits)方法

该方法与acquire()方法不同,后者只需要获取一个信号量值, 而前者则获取permits个。

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
复制代码

void acquireUninterruptibly()方法

该方法与acquire()类似,不同之处在于该方法对中断不响应,也就是当当前线程调用了 acquireUninterruptibly获取资源时(包含被阻塞后),其他线程调用了当前线程的interrupt() 方法设置了当前线程的中断标志,此时当前线程并不会抛出IllegalArgumentException异常而返回。

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}
复制代码

void release()方法

该方法的作用是把当前Semaphore对象的信号量值增加1,如果当前有线程因为调用aquire方法被阻塞而被放入了AQS的阻塞 队列,则会根据公平策略选择一个信号量个数能被满足的线程进行激活, 激活的线程会尝试获取刚增加的信号量.

public void release() {
    //(1)arg=1
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    //(2)尝试释放资源
    if (tryReleaseShared(arg)) {
        //(3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取当前信号量值
        int current = getState();
        //将当前信号量值增加releases,这里为增加1
        int next = current + releases;
        //移除处理
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //使用CAS保证更新信号量值的原子性
        if (compareAndSetState(current, next))
            return true;
    }
}
复制代码

由代码release()->sync.releaseShared(1),可知,release方法每次只会对信号量值增加1,tryReleaseShared方法是无限循环,使用CAS保证了release方法对信号量递增1的原子性操作.tryReleaseShared方法增加信号量值成功后会执行代码(3),即调用AQS的方法来激活因为调用acquire方法而被阻塞的线程。

void release(int permits)方法

该方法与不带参数的release方法的不同之处在于,前者每次调用会在信号量值原来的基础上增加 permits,而后者每次增加l。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
复制代码

另外可以看到,这里的sync.releaseShared是共享方法,这说明该信号量是线程共享的,信号量没有和固定线程绑定,多个线程可以同时使用CAS去更新信号量的值而不会被阻塞。

原文  https://juejin.im/post/5e204a2a518825367841d40d
正文到此结束
Loading...