转载

读写锁(Reader Writer Lock)

目的

假设我们有一个共享内存区域,上面详细介绍了基本约束。可以保护互斥锁后面的共享数据,在这种情况下,没有两个线程可以同时访问数据。但是,此解决方案不是最理想的,因为读取器R1可能具有锁定,然后另一个读取器R2请求访问。R2在开始自己的读操作之前等到R1完成是愚蠢的。相反,R2应立即开始。这是Reader Writer Lock模式的动机。

说明

维基百科说

在计算机科学中,读写器(rw)或共享独占锁(也称为多读写器/单写器锁或多读写器锁或推送锁)是解决读写器问题的同步原语。rw锁允许对只读操作进行并发访问,而写操作则需要独占访问。这意味着多个线程可以并行读取数据,但写入或修改数据需要一个排它锁。当写入程序正在写入数据时,所有其他写入程序或读取程序都将被阻止,直到写入程序完成写入。常见的用法可能是控制对内存中数据结构的访问,这些数据结构不能自动更新,并且在更新完成之前无效(不应被其他线程读取)。

源代码

此示例使用两个互斥锁来演示多个读取器和写入器的并发访问。

类图

读写锁(Reader Writer Lock)

第1步: 创建Reader类,在获取读锁定时读取。

<b>public</b> <b>class</b> Reader implements Runnable {

  <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(Reader.<b>class</b>);

  <b>private</b> Lock readLock;

  <b>private</b> String name;
  
  <b>private</b> <b>long</b> readingTime;

  <font><i>/**
   * Create new Reader
   * 
   * @param name - Name of the thread owning the reader
   * @param readLock - Lock for this reader
   * @param readingTime - amount of time (in milliseconds) for this reader to engage reading
   */</i></font><font>
  <b>public</b> Reader(String name, Lock readLock, <b>long</b> readingTime) {
    <b>this</b>.name = name;
    <b>this</b>.readLock = readLock;
    <b>this</b>.readingTime = readingTime;
  }
  
  </font><font><i>/**
   * Create new Reader who reads for 250ms
   * 
   * @param name - Name of the thread owning the reader
   * @param readLock - Lock for this reader
   */</i></font><font>
  <b>public</b> Reader(String name, Lock readLock) {
    <b>this</b>(name, readLock, 250L);
  }

  @Override
  <b>public</b> <b>void</b> run() {
    readLock.lock();
    <b>try</b> {
      read();
    } <b>catch</b> (InterruptedException e) {
      LOGGER.info(</font><font>"InterruptedException when reading"</font><font>, e);
      Thread.currentThread().interrupt();
    } <b>finally</b> {
      readLock.unlock();
    }
  }

  </font><font><i>/**
   * Simulate the read operation
   * 
   */</i></font><font>
  <b>public</b> <b>void</b> read() throws InterruptedException {
    LOGGER.info(</font><font>"{} begin"</font><font>, name);
    Thread.sleep(readingTime);
    LOGGER.info(</font><font>"{} finish after reading {}ms"</font><font>, name, readingTime);
  }
}
</font>

第2步:  Writer类,在获取写锁定时写入。

<b>public</b> <b>class</b> Writer implements Runnable {

  <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(Writer.<b>class</b>);

  <b>private</b> Lock writeLock;

  <b>private</b> String name;
  
  <b>private</b> <b>long</b> writingTime;

  <font><i>/**
   * Create new Writer who writes for 250ms
   * 
   * @param name - Name of the thread owning the writer
   * @param writeLock - Lock for this writer
   */</i></font><font>
  <b>public</b> Writer(String name, Lock writeLock) {
    <b>this</b>(name, writeLock, 250L);
  }
  
  </font><font><i>/**
   * Create new Writer
   * 
   * @param name - Name of the thread owning the writer
   * @param writeLock - Lock for this writer
   * @param writingTime - amount of time (in milliseconds) for this reader to engage writing
   */</i></font><font>
  <b>public</b> Writer(String name, Lock writeLock, <b>long</b> writingTime) {
    <b>this</b>.name = name;
    <b>this</b>.writeLock = writeLock;
    <b>this</b>.writingTime = writingTime;
  }


  @Override
  <b>public</b> <b>void</b> run() {
    writeLock.lock();
    <b>try</b> {
      write();
    } <b>catch</b> (InterruptedException e) {
      LOGGER.info(</font><font>"InterruptedException when writing"</font><font>, e);
      Thread.currentThread().interrupt();
    } <b>finally</b> {
      writeLock.unlock();
    }
  }
  
  </font><font><i>/**
   * Simulate the write operation
   */</i></font><font>
  <b>public</b> <b>void</b> write() throws InterruptedException {
    LOGGER.info(</font><font>"{} begin"</font><font>, name);
    Thread.sleep(writingTime);
    LOGGER.info(</font><font>"{} finished after writing {}ms"</font><font>, name, writingTime);
  }
}
</font>

第3步: 现在是时候创建ReaderWriterLock类来控制读写器的访问了。

允许多个读取器同时保持锁定,但如果任何写入程序持有锁,则读取器等待。如果读取器持有锁,则编写器等待。这种锁是不公平的。

<b>public</b> <b>class</b> ReaderWriterLock implements ReadWriteLock {
  
  <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(ReaderWriterLock.<b>class</b>);


  <b>private</b> Object readerMutex = <b>new</b> Object();

  <b>private</b> <b>int</b> currentReaderCount;

  <font><i>/**
   * Global mutex is used to indicate that whether reader or writer 
   * gets the lock in the moment.
   * <p>
   * 1. When it contains the reference of {@link #readerLock}, it means that the lock 
    * is acquired by the reader, another
   * reader can also do the read operation concurrently. <br>
   * 2. When it contains the reference of reference of {@link #writerLock}, it means that 
    * the lock is acquired by the
   * writer exclusively, no more reader or writer can get the lock.
   * <p>
   * This is the most important field in this class to control the access for reader/writer.
   */</i></font><font>
  <b>private</b> Set<Object> globalMutex = <b>new</b> HashSet<>();

  <b>private</b> ReadLock readerLock = <b>new</b> ReadLock();
  <b>private</b> WriteLock writerLock = <b>new</b> WriteLock();

  @Override
  <b>public</b> Lock readLock() {
    <b>return</b> readerLock;
  }

  @Override
  <b>public</b> Lock writeLock() {
    <b>return</b> writerLock;
  }

  </font><font><i>/**
   * return true when globalMutex hold the reference of writerLock
   */</i></font><font>
  <b>private</b> <b>boolean</b> doesWriterOwnThisLock() {
    <b>return</b> globalMutex.contains(writerLock);
  }

  </font><font><i>/**
   * Nobody get the lock when globalMutex contains nothing
   * 
   */</i></font><font>
  <b>private</b> <b>boolean</b> isLockFree() {
    <b>return</b> globalMutex.isEmpty();
  }

  </font><font><i>/**
   * Reader Lock, can be access for more than one reader concurrently if no writer get the lock
   */</i></font><font>
  <b>private</b> <b>class</b> ReadLock implements Lock {

    @Override
    <b>public</b> <b>void</b> lock() {
      <b>synchronized</b> (readerMutex) {
        currentReaderCount++;
        <b>if</b> (currentReaderCount == 1) {
          acquireForReaders();
        }
      }
    }

    </font><font><i>/**
     * Acquire the globalMutex lock on behalf of current and future concurrent readers. Make sure no writers currently
     * owns the lock.
     */</i></font><font>
    <b>private</b> <b>void</b> acquireForReaders() {
      </font><font><i>// Try to get the globalMutex lock for the first reader</i></font><font>
      <b>synchronized</b> (globalMutex) {
        </font><font><i>// If the no one get the lock or the lock is locked by reader, just set the reference</i></font><font>
        </font><font><i>// to the globalMutex to indicate that the lock is locked by Reader.</i></font><font>
        <b>while</b> (doesWriterOwnThisLock()) {
          <b>try</b> {
            globalMutex.wait();
          } <b>catch</b> (InterruptedException e) {
            LOGGER.info(</font><font>"InterruptedException while waiting for globalMutex in acquireForReaders"</font><font>, e);
            Thread.currentThread().interrupt();
          }
        }
        globalMutex.add(<b>this</b>);
      }
    }

    @Override
    <b>public</b> <b>void</b> unlock() {

      <b>synchronized</b> (readerMutex) {
        currentReaderCount--;
        </font><font><i>// Release the lock only when it is the last reader, it is ensure that the lock is released</i></font><font>
        </font><font><i>// when all reader is completely.</i></font><font>
        <b>if</b> (currentReaderCount == 0) {
          <b>synchronized</b> (globalMutex) {
            </font><font><i>// Notify the waiter, mostly the writer</i></font><font>
            globalMutex.remove(<b>this</b>);
            globalMutex.notifyAll();
          }
        }
      }

    }

    @Override
    <b>public</b> <b>void</b> lockInterruptibly() throws InterruptedException {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }

    @Override
    <b>public</b> <b>boolean</b> tryLock() {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }

    @Override
    <b>public</b> <b>boolean</b> tryLock(<b>long</b> time, TimeUnit unit) throws InterruptedException {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }

    @Override
    <b>public</b> Condition newCondition() {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }
  }

  </font><font><i>/**
   * Writer Lock, can only be accessed by one writer concurrently
   */</i></font><font>
  <b>private</b> <b>class</b> WriteLock implements Lock {

    @Override
    <b>public</b> <b>void</b> lock() {

      <b>synchronized</b> (globalMutex) {

        </font><font><i>// Wait until the lock is free.</i></font><font>
        <b>while</b> (!isLockFree()) {
          <b>try</b> {
            globalMutex.wait();
          } <b>catch</b> (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
        </font><font><i>// When the lock is free, acquire it by placing an entry in globalMutex</i></font><font>
        globalMutex.add(<b>this</b>);
      }
    }

    @Override
    <b>public</b> <b>void</b> unlock() {

      <b>synchronized</b> (globalMutex) {
        globalMutex.remove(<b>this</b>);
        </font><font><i>// Notify the waiter, other writer or reader</i></font><font>
        globalMutex.notifyAll();
      }
    }

    @Override
    <b>public</b> <b>void</b> lockInterruptibly() throws InterruptedException {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }

    @Override
    <b>public</b> <b>boolean</b> tryLock() {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }

    @Override
    <b>public</b> <b>boolean</b> tryLock(<b>long</b> time, TimeUnit unit) throws InterruptedException {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }

    @Override
    <b>public</b> Condition newCondition() {
      <b>throw</b> <b>new</b> UnsupportedOperationException();
    }
  }

}
</font>

第4步: 让我们测试一下这个设计模式。

<b>public</b> <b>class</b> ReaderWriterLockDemo {

  <b>private</b> <b>static</b> <b>final</b> Logger LOGGER = LoggerFactory.getLogger(App.<b>class</b>);

  <font><i>/**
   * Program entry point
   * 
   * @param args command line args
   */</i></font><font>
  <b>public</b> <b>static</b> <b>void</b> main(String[] args) {

    ExecutorService executeService = Executors.newFixedThreadPool(10);
    ReaderWriterLock lock = <b>new</b> ReaderWriterLock();
    
    </font><font><i>// Start writers</i></font><font>
    IntStream.range(0, 5)
        .forEach(i -> executeService.submit(<b>new</b> Writer(</font><font>"Writer "</font><font> + i, lock.writeLock(), 
            ThreadLocalRandom.current().nextLong(5000))));
    LOGGER.info(</font><font>"Writers added..."</font><font>);

    </font><font><i>// Start readers</i></font><font>
    IntStream.range(0, 5)
        .forEach(i -> executeService.submit(<b>new</b> Reader(</font><font>"Reader "</font><font> + i, lock.readLock(), 
            ThreadLocalRandom.current().nextLong(10))));
    LOGGER.info(</font><font>"Readers added..."</font><font>);
    
    <b>try</b> {
      Thread.sleep(5000L);
    } <b>catch</b> (InterruptedException e) {
      LOGGER.error(</font><font>"Error sleeping before adding more readers"</font><font>, e);
      Thread.currentThread().interrupt();
    }

    </font><font><i>// Start readers</i></font><font>
    IntStream.range(6, 10)
        .forEach(i -> executeService.submit(<b>new</b> Reader(</font><font>"Reader "</font><font> + i, lock.readLock(), 
            ThreadLocalRandom.current().nextLong(10))));
    LOGGER.info(</font><font>"More readers added..."</font><font>);   

    </font><font><i>// write operations are exclusive.</i></font><font>
    executeService.shutdown();
    <b>try</b> {
      executeService.awaitTermination(5, TimeUnit.SECONDS);
    } <b>catch</b> (InterruptedException e) {
      LOGGER.error(</font><font>"Error waiting for ExecutorService shutdown"</font><font>, e);
      Thread.currentThread().interrupt();
    }

  }

}
</font>

适用性

应用程序需要为多个线程增加资源同步性能,特别是有混合读/写操作。

原文  https://www.jdon.com/52111
正文到此结束
Loading...