之前提到的ReentrantLock是排他锁,在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
执行非公平(默认)和公平锁的获取方式,吞吐量非公平优先于公平。
支持重进入:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁。
遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁。
可以看到,ReentrantReadWriteLock实现了ReadWriteLock接口。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
复制代码
该接口只定义了两个方法,返回读锁和写锁。
可以看到,Sync同步类实现了AQS抽象类。也就是说ReentrantReadWriteLock也是基于AQS来实现的。
和ReentrantLock类似,Sync也有两个子类分为FairSync(公平锁)和NonfairSync(非公平锁)。
ReadLock以及WriteLock实现了Lock接口,同时保持了一个Sync的引用。
HoldCounter主要配合读锁使用。
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
复制代码
HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。
ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
复制代码
我们先从最基础也是最重要的Sync开始。
// 版本序列号
private static final long serialVersionUID = 6317671515068378041L;
// 高16位为读锁,低16位为写锁
static final int SHARED_SHIFT = 16;
// 读锁单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁最大数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 本地线程计数器
private transient ThreadLocalHoldCounter readHolds;
// 缓存的计数器
private transient HoldCounter cachedHoldCounter;
// 第一个读线程
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
复制代码
主要定义了读写状态的设计。
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
复制代码
同步状态在重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。
假设当前同步状态值为S,get和set的操作如下:
获取写状态:S&0x0000FFFF:将高16位全部抹去
获取读状态:S>>>16:无符号补0,右移16位
写状态加1: S+1
读状态加1: S+(1<<16)即S + 0x00010000
在代码层的判断中,如果S不等于0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取资源数
int c = getState();
// 获取独占线程的重入数
int w = exclusiveCount(c);
// 如果资源已经被获取过(此时不管是读锁获取过还是写锁获取过都会进入该判断)
if (c != 0) {
// 如果写锁重入数为0或者当前线程不为独占线程直接返回尝试获取资源失败
// 写锁获取资源数不为0则代表了读锁没有获取该资源
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果重入数加上需要获取的资源大于最大重入数则直接抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 如果资源获取数大于0且是当前线程获取的资源,则设置资源数
setState(c + acquires);
return true;
}
// 到这里则表示c == 0,写锁和读锁都没有被获取过。
// writerShouldBlock判断是否需要阻塞(公平锁和非公平锁实现方式不同)
// 如果不需要阻塞会CAS尝试获取资源
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
// 获取资源失败则返回false表示尝试获取资源失败,进入AQS队列等待获取锁
return false;
// 走到这里证明尝试获取资源已经成功了,设置当前线程为独占线程
setExclusiveOwnerThread(current);
return true;
}
复制代码
流程图如下:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码
其实就是低16位代表写锁。
protected final boolean tryRelease(int releases) {
// 如果当前线程不是独占线程,直接抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取释放资源后的总资源
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
// 如果释放后的总资源等于0
if (free)
// 则将独占线程设置为null
setExclusiveOwnerThread(null);
// 设置释放后的总资源
setState(nextc);
// 返回是否释放锁
return free;
}
复制代码
tryRelease比较简单,就不再赘述。
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取被占用资源数
int c = getState();
// 如果独占(写锁)获取资源不为0
if (exclusiveCount(c) != 0 &&
// 且独占(写锁)获取资源线程不为当前线程
getExclusiveOwnerThread() != current)
// 直接返回-1
return -1;
// 共享资源的获取数r(写锁被获取过多少次)
int r = sharedCount(c);
// 判断是否读是否需要阻塞(公平锁和非公平锁)
// 如果不需要阻塞
if (!readerShouldBlock() &&
// 写锁被获取次数小于最大次数
r < MAX_COUNT &&
// 且CAS方式设置资源数成功
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果r==0则表示当前线程是第一个获取读锁的线程
if (r == 0) {
// 则第一个获取读锁的线程设置为当前线程
firstReader = current;
// 第一个读线程占用的资源数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果当前线程是第一个获取读锁的线程,则第一个读线程占用资源数++
firstReaderHoldCount++;
} else {
// 读锁数量不为0且第一个获取读锁的不是当前线程
// 获取计数器
HoldCounter rh = cachedHoldCounter;
// 如果计数器为null或者计数器的tid不为当前正在运行的线程的tid
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程对应的计数器
cachedHoldCounter = rh = readHolds.get();
// 如果计数为0
else if (rh.count == 0)
// 将计数器设置到ThreadLocal中
readHolds.set(rh);
// 计数+1
rh.count++;
}
// 返回1
return 1;
}
return fullTryAcquireShared(current);
}
复制代码
更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数,这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。
在tryAcquireShared函数中,如果下列三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。
final int fullTryAcquireShared(Thread current) {
// 计数器
HoldCounter rh = null;
for (;;) {
// 获取被占用资源数
int c = getState();
// 如果写锁占用资源数不为0
if (exclusiveCount(c) != 0) {
// 如果不是当前线程获取的写锁
if (getExclusiveOwnerThread() != current)
return -1;
// 如果需要阻塞
} else if (readerShouldBlock()) {
// 如果第一个获取读锁线程是当前线程
if (firstReader == current) {
} else {
// 如果计数器为null
if (rh == null) {
// 获取缓存计数器
rh = cachedHoldCounter;
// 如果计数器为null或者计数器不是当前线程计数器
if (rh == null || rh.tid != getThreadId(current)) {
// 获取当前线程计数器
rh = readHolds.get();
// 如果当前线程读锁计数为0
if (rh.count == 0)
// 删除当前线程计数器
readHolds.remove();
}
}
// 如果当前线程计数为0 返回-1
if (rh.count == 0)
return -1;
}
}
// 如果读锁占有资源数等于最大资源数
if (sharedCount(c) == MAX_COUNT)
// 抛异常
throw new Error("Maximum lock count exceeded");
// Cas方式获取读锁资源
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第一个获取读锁的线程
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
复制代码
在tryAcquireShared方法上加入了threadlocal的清理流程,实质上还是循环获取读锁。
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 如果第一个获取读锁的线程是当前线程
if (firstReader == current) {
// 如果第一个获取读锁的线程获取读锁的重入数为1
if (firstReaderHoldCount == 1)
// 将第一个获取读锁的线程设置为null
firstReader = null;
else
// 将第一个获取读锁的线程的资源占用数--
firstReaderHoldCount--;
} else {
// 拿到缓存计数器
HoldCounter rh = cachedHoldCounter;
// 如果缓存计数器指向的不是当前线程
if (rh == null || rh.tid != getThreadId(current))
// 从threadLocal里拿缓存计数器
rh = readHolds.get();
// 拿到当前线程的获取读锁重入锁
int count = rh.count;
// 如果重入数小于等于1
if (count <= 1) {
// 清理threadlocal
readHolds.remove();
// 如果小于等于0,抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
// 重入数-1
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
// cas方式释放资源
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
复制代码
读锁的释放只有两个步骤:
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
复制代码
比较简单,就不一一注释代码了。
为什么不全放在threadlocal中?
为什么HoldCounter中不是直接指向当前线程,而是记录线程id?
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
复制代码
可以看到,ReentrantReadWriteLock构造分不同情况构造了公平锁和非公平锁。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
复制代码
公平锁的writerShouldBlock、readerShouldBlock方法调用了AQS的hasQueuedPredecessors判断是否有线程先于当前线程获取锁。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
复制代码
可以看到,非公平锁的写锁writerShouldBlock方法是直接返回false的,也就是说在tryAcquire方法中是直接Cas尝试获取一次资源的,readerShouldBlock则调用了AQS的apparentlyFirstQueuedIsExclusive方法。
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
复制代码
返回为true需要以下条件:
这个方法判断队列的head.next是否正在等待独占锁(写锁)。
官方的解释是读锁不应该让写锁始终等待,造成写锁线程饥饿的情况。
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
复制代码
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
复制代码
比较简单,没啥好说的。
在线程持有读锁的情况下,该线程不能取得写锁(因为获取写锁的时候,如果发现当前的读锁被占用,就马上获取失败,不管读锁是不是被当前线程持有)。
在线程持有写锁的情况下,该线程可以继续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被当前线程占用的情况才会获取失败)。