目的
假设我们有一个共享内存区域,上面详细介绍了基本约束。可以保护互斥锁后面的共享数据,在这种情况下,没有两个线程可以同时访问数据。但是,此解决方案不是最理想的,因为读取器R1可能具有锁定,然后另一个读取器R2请求访问。R2在开始自己的读操作之前等到R1完成是愚蠢的。相反,R2应立即开始。这是Reader Writer Lock模式的动机。
说明
维基百科说
在计算机科学中,读写器(rw)或共享独占锁(也称为多读写器/单写器锁或多读写器锁或推送锁)是解决读写器问题的同步原语。rw锁允许对只读操作进行并发访问,而写操作则需要独占访问。这意味着多个线程可以并行读取数据,但写入或修改数据需要一个排它锁。当写入程序正在写入数据时,所有其他写入程序或读取程序都将被阻止,直到写入程序完成写入。常见的用法可能是控制对内存中数据结构的访问,这些数据结构不能自动更新,并且在更新完成之前无效(不应被其他线程读取)。
源代码
此示例使用两个互斥锁来演示多个读取器和写入器的并发访问。
类图
第1步: 创建Reader类,在获取读锁定时读取。
<b>public</b> <b>class</b> Reader implements Runnable {
<b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(Reader.<b>class</b>);
<b>private</b> Lock readLock;
<b>private</b> String name;
<b>private</b> <b>long</b> readingTime;
<font><i>/**
* Create new Reader
*
* @param name - Name of the thread owning the reader
* @param readLock - Lock for this reader
* @param readingTime - amount of time (in milliseconds) for this reader to engage reading
*/</i></font><font>
<b>public</b> Reader(String name, Lock readLock, <b>long</b> readingTime) {
<b>this</b>.name = name;
<b>this</b>.readLock = readLock;
<b>this</b>.readingTime = readingTime;
}
</font><font><i>/**
* Create new Reader who reads for 250ms
*
* @param name - Name of the thread owning the reader
* @param readLock - Lock for this reader
*/</i></font><font>
<b>public</b> Reader(String name, Lock readLock) {
<b>this</b>(name, readLock, 250L);
}
@Override
<b>public</b> <b>void</b> run() {
readLock.lock();
<b>try</b> {
read();
} <b>catch</b> (InterruptedException e) {
LOGGER.info(</font><font>"InterruptedException when reading"</font><font>, e);
Thread.currentThread().interrupt();
} <b>finally</b> {
readLock.unlock();
}
}
</font><font><i>/**
* Simulate the read operation
*
*/</i></font><font>
<b>public</b> <b>void</b> read() throws InterruptedException {
LOGGER.info(</font><font>"{} begin"</font><font>, name);
Thread.sleep(readingTime);
LOGGER.info(</font><font>"{} finish after reading {}ms"</font><font>, name, readingTime);
}
}
</font>
第2步: Writer类,在获取写锁定时写入。
<b>public</b> <b>class</b> Writer implements Runnable {
<b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(Writer.<b>class</b>);
<b>private</b> Lock writeLock;
<b>private</b> String name;
<b>private</b> <b>long</b> writingTime;
<font><i>/**
* Create new Writer who writes for 250ms
*
* @param name - Name of the thread owning the writer
* @param writeLock - Lock for this writer
*/</i></font><font>
<b>public</b> Writer(String name, Lock writeLock) {
<b>this</b>(name, writeLock, 250L);
}
</font><font><i>/**
* Create new Writer
*
* @param name - Name of the thread owning the writer
* @param writeLock - Lock for this writer
* @param writingTime - amount of time (in milliseconds) for this reader to engage writing
*/</i></font><font>
<b>public</b> Writer(String name, Lock writeLock, <b>long</b> writingTime) {
<b>this</b>.name = name;
<b>this</b>.writeLock = writeLock;
<b>this</b>.writingTime = writingTime;
}
@Override
<b>public</b> <b>void</b> run() {
writeLock.lock();
<b>try</b> {
write();
} <b>catch</b> (InterruptedException e) {
LOGGER.info(</font><font>"InterruptedException when writing"</font><font>, e);
Thread.currentThread().interrupt();
} <b>finally</b> {
writeLock.unlock();
}
}
</font><font><i>/**
* Simulate the write operation
*/</i></font><font>
<b>public</b> <b>void</b> write() throws InterruptedException {
LOGGER.info(</font><font>"{} begin"</font><font>, name);
Thread.sleep(writingTime);
LOGGER.info(</font><font>"{} finished after writing {}ms"</font><font>, name, writingTime);
}
}
</font>
第3步: 现在是时候创建ReaderWriterLock类来控制读写器的访问了。
允许多个读取器同时保持锁定,但如果任何写入程序持有锁,则读取器等待。如果读取器持有锁,则编写器等待。这种锁是不公平的。
<b>public</b> <b>class</b> ReaderWriterLock implements ReadWriteLock {
<b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(ReaderWriterLock.<b>class</b>);
<b>private</b> Object readerMutex = <b>new</b> Object();
<b>private</b> <b>int</b> currentReaderCount;
<font><i>/**
* Global mutex is used to indicate that whether reader or writer
* gets the lock in the moment.
* <p>
* 1. When it contains the reference of {@link #readerLock}, it means that the lock
* is acquired by the reader, another
* reader can also do the read operation concurrently. <br>
* 2. When it contains the reference of reference of {@link #writerLock}, it means that
* the lock is acquired by the
* writer exclusively, no more reader or writer can get the lock.
* <p>
* This is the most important field in this class to control the access for reader/writer.
*/</i></font><font>
<b>private</b> Set<Object> globalMutex = <b>new</b> HashSet<>();
<b>private</b> ReadLock readerLock = <b>new</b> ReadLock();
<b>private</b> WriteLock writerLock = <b>new</b> WriteLock();
@Override
<b>public</b> Lock readLock() {
<b>return</b> readerLock;
}
@Override
<b>public</b> Lock writeLock() {
<b>return</b> writerLock;
}
</font><font><i>/**
* return true when globalMutex hold the reference of writerLock
*/</i></font><font>
<b>private</b> <b>boolean</b> doesWriterOwnThisLock() {
<b>return</b> globalMutex.contains(writerLock);
}
</font><font><i>/**
* Nobody get the lock when globalMutex contains nothing
*
*/</i></font><font>
<b>private</b> <b>boolean</b> isLockFree() {
<b>return</b> globalMutex.isEmpty();
}
</font><font><i>/**
* Reader Lock, can be access for more than one reader concurrently if no writer get the lock
*/</i></font><font>
<b>private</b> <b>class</b> ReadLock implements Lock {
@Override
<b>public</b> <b>void</b> lock() {
<b>synchronized</b> (readerMutex) {
currentReaderCount++;
<b>if</b> (currentReaderCount == 1) {
acquireForReaders();
}
}
}
</font><font><i>/**
* Acquire the globalMutex lock on behalf of current and future concurrent readers. Make sure no writers currently
* owns the lock.
*/</i></font><font>
<b>private</b> <b>void</b> acquireForReaders() {
</font><font><i>// Try to get the globalMutex lock for the first reader</i></font><font>
<b>synchronized</b> (globalMutex) {
</font><font><i>// If the no one get the lock or the lock is locked by reader, just set the reference</i></font><font>
</font><font><i>// to the globalMutex to indicate that the lock is locked by Reader.</i></font><font>
<b>while</b> (doesWriterOwnThisLock()) {
<b>try</b> {
globalMutex.wait();
} <b>catch</b> (InterruptedException e) {
LOGGER.info(</font><font>"InterruptedException while waiting for globalMutex in acquireForReaders"</font><font>, e);
Thread.currentThread().interrupt();
}
}
globalMutex.add(<b>this</b>);
}
}
@Override
<b>public</b> <b>void</b> unlock() {
<b>synchronized</b> (readerMutex) {
currentReaderCount--;
</font><font><i>// Release the lock only when it is the last reader, it is ensure that the lock is released</i></font><font>
</font><font><i>// when all reader is completely.</i></font><font>
<b>if</b> (currentReaderCount == 0) {
<b>synchronized</b> (globalMutex) {
</font><font><i>// Notify the waiter, mostly the writer</i></font><font>
globalMutex.remove(<b>this</b>);
globalMutex.notifyAll();
}
}
}
}
@Override
<b>public</b> <b>void</b> lockInterruptibly() throws InterruptedException {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
@Override
<b>public</b> <b>boolean</b> tryLock() {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
@Override
<b>public</b> <b>boolean</b> tryLock(<b>long</b> time, TimeUnit unit) throws InterruptedException {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
@Override
<b>public</b> Condition newCondition() {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
}
</font><font><i>/**
* Writer Lock, can only be accessed by one writer concurrently
*/</i></font><font>
<b>private</b> <b>class</b> WriteLock implements Lock {
@Override
<b>public</b> <b>void</b> lock() {
<b>synchronized</b> (globalMutex) {
</font><font><i>// Wait until the lock is free.</i></font><font>
<b>while</b> (!isLockFree()) {
<b>try</b> {
globalMutex.wait();
} <b>catch</b> (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
</font><font><i>// When the lock is free, acquire it by placing an entry in globalMutex</i></font><font>
globalMutex.add(<b>this</b>);
}
}
@Override
<b>public</b> <b>void</b> unlock() {
<b>synchronized</b> (globalMutex) {
globalMutex.remove(<b>this</b>);
</font><font><i>// Notify the waiter, other writer or reader</i></font><font>
globalMutex.notifyAll();
}
}
@Override
<b>public</b> <b>void</b> lockInterruptibly() throws InterruptedException {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
@Override
<b>public</b> <b>boolean</b> tryLock() {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
@Override
<b>public</b> <b>boolean</b> tryLock(<b>long</b> time, TimeUnit unit) throws InterruptedException {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
@Override
<b>public</b> Condition newCondition() {
<b>throw</b> <b>new</b> UnsupportedOperationException();
}
}
}
</font>
第4步: 让我们测试一下这个设计模式。
<b>public</b> <b>class</b> ReaderWriterLockDemo {
<b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(App.<b>class</b>);
<font><i>/**
* Program entry point
*
* @param args command line args
*/</i></font><font>
<b>public</b> <b>static</b> <b>void</b> main(String[] args) {
ExecutorService executeService = Executors.newFixedThreadPool(10);
ReaderWriterLock lock = <b>new</b> ReaderWriterLock();
</font><font><i>// Start writers</i></font><font>
IntStream.range(0, 5)
.forEach(i -> executeService.submit(<b>new</b> Writer(</font><font>"Writer "</font><font> + i, lock.writeLock(),
ThreadLocalRandom.current().nextLong(5000))));
LOGGER.info(</font><font>"Writers added..."</font><font>);
</font><font><i>// Start readers</i></font><font>
IntStream.range(0, 5)
.forEach(i -> executeService.submit(<b>new</b> Reader(</font><font>"Reader "</font><font> + i, lock.readLock(),
ThreadLocalRandom.current().nextLong(10))));
LOGGER.info(</font><font>"Readers added..."</font><font>);
<b>try</b> {
Thread.sleep(5000L);
} <b>catch</b> (InterruptedException e) {
LOGGER.error(</font><font>"Error sleeping before adding more readers"</font><font>, e);
Thread.currentThread().interrupt();
}
</font><font><i>// Start readers</i></font><font>
IntStream.range(6, 10)
.forEach(i -> executeService.submit(<b>new</b> Reader(</font><font>"Reader "</font><font> + i, lock.readLock(),
ThreadLocalRandom.current().nextLong(10))));
LOGGER.info(</font><font>"More readers added..."</font><font>);
</font><font><i>// write operations are exclusive.</i></font><font>
executeService.shutdown();
<b>try</b> {
executeService.awaitTermination(5, TimeUnit.SECONDS);
} <b>catch</b> (InterruptedException e) {
LOGGER.error(</font><font>"Error waiting for ExecutorService shutdown"</font><font>, e);
Thread.currentThread().interrupt();
}
}
}
</font>
适用性
应用程序需要为多个线程增加资源同步性能,特别是有混合读/写操作。