转载

ReentrantLock详解

ReentrantLock详解
ReentrantLock详解

斜体为抽象类,下横线为接口

聚合关系总结:

  1. ReentrantLock实现了Lock,Serializable接口
  2. ReentrantLock.Sync(内部类)继承了AQS
  3. ReentrantLock.NonfairSync和ReentrantLock.FairSync继承了ReentrantLock.Sync
  4. ReentrantLock持有ReentrantLock.Sync对象(实现锁功能)

锁实现总结:

  1. 由Node节点组成一条同步队列(有head,tail两个指针,并且 head初始化时指向空节点 )
  2. int state标记锁使用数量(独占锁时,通常为1,发生重入时>1)
  3. lock()时加到队列尾部
  4. unlock()时,释放head节点,并指向下一个节点head=head.next,然后唤醒当前head节点

性质:

  1. 独占锁(排它锁):只能有一个线程获取锁
  2. 重入锁:一个线程可以多次lock()
  3. 公平/非公平锁:只针对上锁过程
    1. 非公平锁:尝试获取锁,若成功立刻返回,失败则加入同步队列
    2. 公平锁:直接加入同步队列

Lock

Lock接口定义了锁的行为

public interface Lock {
	//上锁(不响应Thread.interrupt()直到获取锁)
    void lock();
	//上锁(响应Thread.interrupt())
    void lockInterruptibly() throws InterruptedException;
	//尝试获取锁(以nonFair方式获取锁)
    boolean tryLock();
  	//在指定时间内尝试获取锁(响应Thread.interrupt(),支持公平/二阶段非公平)
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
	//解锁
    void unlock();
	//获取Condition
    Condition newCondition();
}

lock()过程

//锁具体实现
private final Sync sync;
//根据传入参数选择FairSync或NonfairSync实现
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
	sync.lock();
}
#java.util.concurrent.locks.ReentrantLock.Sync
abstract void lock();

公平锁

加入同步队列(当同步队列为空时会直接获得锁),等待锁

#java.util.concurrent.locks.ReentrantLock.FairSync
final void lock() {
	acquire(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {
	if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

acquire()流程:

  1. tryAcquire():模板方法,获取锁

    #java.util.concurrent.locks.ReentrantLock.FairSync
     protected final boolean tryAcquire(int acquires) {
     	//获取当前线程
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {//当前锁没被占用
     	   if (!hasQueuedPredecessors() &&//1.判断同步队列中是否有节点在等待
     		   compareAndSetState(0, acquires)) {//2.如果上面!1成立,修改state值(表明当前锁已被占用)
     		   setExclusiveOwnerThread(current);//3.如果2成立,修改当前占用锁的线程为当前线程
     		   return true;
     	   }
        }
        else if (current == getExclusiveOwnerThread()) {//占用锁线程==当前线程(重入)
     	   int nextc = c + acquires;//
     	   if (nextc < 0)
     		   throw new Error("Maximum lock count exceeded");
     	   setState(nextc);//修改status
     	   return true;
        }
        return false;//直接获取锁失败
    }
  2. acquireQueued(addWaiter(Node.EXCLUSIVE), arg):加入同步队列

    #java.util.concurrent.locks.AbstractQueuedSynchronizer
    //1
    private Node addWaiter(Node mode) {
     //生成node
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
     	//将node加到队列尾部
     	   node.prev = pred;
     	   if (compareAndSetTail(pred, node)) {
     		   pred.next = node;
     		   return node;
     	   }
        }
        //如果加入失败(多线程竞争或者tail指针为null)
        enq(node);
        return node;
    }
    //1.1  
    private Node enq(final Node node) {
     //死循环加入节点(cas会失败)
        for (;;) {
     	   Node t = tail;
     	   if (t == null) { //tail为null,同步队列初始化
     		//设置head指针
     		   if (compareAndSetHead(new Node()))//注意这里是个空节点!!
     			   tail = head;//将tail也指向head
     	   } else {
     		   node.prev = t;//将当前node加到队尾
     		   if (compareAndSetTail(t, node)) {
     			   t.next = node;
     			   return t;//注意这里才返回
     		   }
     	   }
        }
    }
    //2
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
     	//表示是否被打断
     	   boolean interrupted = false;
     	   for (;;) {
     		//获取node.pre节点
     		   final Node p = node.predecessor();
     		   if (p == head //当前节点是否是同步队列中的第二个节点
     		   && tryAcquire(arg)) {//获取锁,head指向当前节点
     			   setHead(node);//head=head.next
     			   p.next = null;//置空 
     			   failed = false;
     			   return interrupted;
     		   }
    
     		   if (shouldParkAfterFailedAcquire(p, node) && //是否空转(因为空转唤醒是个耗时操作,进入空转前判断pre节点状态.如果pre节点即将释放锁,则不进入空转)
     			   parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞)
     			   interrupted = true;//如果Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁)
     	   }
        } finally {
     	   if (failed)//tryAcquire()过程出现异常导致获取锁失败,则移除当前节点
     		   cancelAcquire(node);
        }
    }

    过程总结:

    1. 空转(如果当前节点是同步队列中的第二个节点,则直接获得锁返回)
    2. 获得锁

    注意:这里有两次tryAcquire()过程.第一次,为了避免同步队列为空时还插入队列产生的性能耗费(cas空转).第二次,就是正常的流程.先插入队尾,然后等待唤醒,再获取锁

  3. selfInterrupt(): 唤醒当前线程

    static void selfInterrupt() {//在获取锁之后 响应intterpt()请求
    	Thread.currentThread().interrupt();
    }

非公平锁

一阶段

#java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {
	//在acquire()之前先尝试获取锁
	if (compareAndSetState(0, 1))
		setExclusiveOwnerThread(Thread.currentThread());
	else
		acquire(1);
}

二阶段 acquire()流程与公平锁一模一样,唯一区别在于tryAcquire()实现中

#java.util.concurrent.locks.ReentrantLock.NonfairSync
protected final boolean tryAcquire(int acquires) {
 	return nonfairTryAcquire(acquires);
 }
 
#java.util.concurrent.locks.ReentrantLock.Sync
 final boolean nonfairTryAcquire(int acquires) {//这个过程其实和FairSync.tryAcquire()基本一致
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		//唯一区别: 这里不会去判断队列中是否为空
		if (compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}
区别点 lock()过程(一阶段) tryAcquire()过程(二阶段)
FairSync 直接acquire() 当前若无线程持有锁,如果同步队列为空,获取锁
NonFairSync 先尝试获取锁,再acquire() 当前若无线程持有锁,获取锁

unlock()过程

#java.util.concurrent.locks.ReentrantLock
public void unlock() {
	sync.release(1);
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放锁
	Node h = head;
	if (h != null &&//head节点为空(非公平锁直接获取锁)
	h.waitStatus != 0)
		unparkSuccessor(h);//唤醒同步队列中离head最近的一个waitStatus<=0的节点
	return true;
}
return false;
}
#java.util.concurrent.locks.ReentrantLock
protected final boolean tryRelease(int releases) {
	int c = getState() - releases;
	//持有锁的线程==当前线程
	if (Thread.currentThread() != getExclusiveOwnerThread())
		throw new IllegalMonitorStateException();
	boolean free = false;
	if (c == 0) {//重入锁全部释放
		free = true;
		//置空持有锁线程
		setExclusiveOwnerThread(null);
	}
	//state==0(此时持有锁,不用cas)
	setState(c);
	return free;
}

lockInterruptibly()过程

lockInterruptibly()与lock()过程基本相同,区别在于Thread.intterpt()的应对措施不同

//lock()
final boolean acquireQueued(final Node node, int arg) {
	boolean failed = true;
	try {
		//表示是否被打断
		boolean interrupted = false;
		for (;;) {
			//获取node.pre节点
			final Node p = node.predecessor();
			if (p == head //当前节点是否是同步队列中的第二个节点
			&& tryAcquire(arg)) {//获取锁,当前head指向当前节点
				setHead(node);//head=head.next
				p.next = null;//置空 
				failed = false;
				return interrupted;
			}

			if (shouldParkAfterFailedAcquire(p, node) && //是否空转(因为空转唤醒是个耗时操作,进入空转前判断pre节点状态.如果pre节点即将释放锁,则不进入空转)
				parkAndCheckInterrupt())//利用unsafe.park()进行空转(阻塞)
				interrupted = true;//如果Thread.interrupt()被调用,(不会真的被打断,会继续循环空转直到获取到锁)
		}
	} finally {
		if (failed)//tryAcquire()过程出现异常导致获取锁失败,则移除当前节点
			cancelAcquire(node);
	}
}
// lockInterruptibly()
private void doAcquireInterruptibly(int arg)
	throws InterruptedException {
	final Node node = addWaiter(Node.EXCLUSIVE);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; 
				failed = false;
				return;
			}
			if (shouldParkAfterFailedAcquire(p, node) &&
				parkAndCheckInterrupt())//唯一区别当Thread.intterpt()打断时,直接抛出异常
				throw new InterruptedException();
		}
	} finally {
		if (failed)//然后移除当前节点
			cancelAcquire(node);
	}
}

tryLock()

#java.util.concurrent.locks.ReentrantLock
public boolean tryLock() {
	//尝试获取非公平锁
	return sync.nonfairTryAcquire(1);
}

tryLock(long timeout, TimeUnit unit)

#java.util.concurrent.locks.ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
		throws InterruptedException {
	return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
#java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
		throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	return tryAcquire(arg) ||//获取锁(公平/非公平)
		doAcquireNanos(arg, nanosTimeout);//在指定时间内等待锁(空转)
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
		throws InterruptedException {
	...
	final long deadline = System.nanoTime() + nanosTimeout;
	//加入队尾
	final Node node = addWaiter(Node.EXCLUSIVE);
	boolean failed = true;
	try {
		for (;;) {
			final Node p = node.predecessor();
			if (p == head && tryAcquire(arg)) {
				setHead(node);
				p.next = null; 
				failed = false;
				return true;
			}
		  //上面与acquireQueued()相同,重点看这里
		  //计算剩余时间
			nanosTimeout = deadline - System.nanoTime();
			if (nanosTimeout <= 0L)
				return false;
			if (shouldParkAfterFailedAcquire(p, node) &&
				nanosTimeout > spinForTimeoutThreshold)
				//利用parkNanos()指定空转时间
				LockSupport.parkNanos(this, nanosTimeout);
			if (Thread.interrupted())//如果被Thread.interrupt(),则抛异常
				throw new InterruptedException();
		}
	} finally {
		if (failed)//移除节点
			cancelAcquire(node);
	}
}

newCondition()

public Condition newCondition() {
	return sync.newCondition();
}
#java.util.concurrent.locks.ReentrantLock.Sync
final ConditionObject newCondition() {
	return new ConditionObject();
}
原文  https://juejin.im/post/5ae1b4f0f265da0b7b359d7a
正文到此结束
Loading...