转载

Concurrency(十五: Java中的读写锁)

读写锁是一个比前文 Java中的锁 更加复杂的锁.想象一下当你有一个应用需要对资源进行读写,然而对资源的读取次数远大于写入.当有两个线程对同一个资源进行读取时并不会有并发问题,所以多个线程可以在同一个时间点安全的读取资源.但是当一个线程需要对资源进行写入时,则其他线程的读写都不能同时进行.允许多个读线程同时操作但只允许一个写线程写入资源,这种情况我们可以通过读写锁来解决.

Java5中的 java.util.concurrent 包中已有读写锁的实现.但我们还是很有必要知道它的底层远离.

Java中读写锁的实现

首先让我们理清读写资源时所需要的条件:

读操作如果没有线程正在进行写操作并且没有线程请求进行写操作.

写操作如果没有线程正在进行写操作和读操作

只有没有线程正在写入资源或是请求写入资源时,线程就可以进行读取操作.如果我们确认写操作比读操作重要的多,我们需要升级写操作的优先级,如果我们没有这么做,那么在读操作太多频繁时候,可能会出现饥饿现象.线程请求写入操作会一直阻塞到所有的读取线程释放读写锁为止.如果新进行的读线程一直抢占先机,那么写线程可能会无限期的等待下去.结果就是产生饥饿现象.所以一个线程只能在没有线程进行写操作或是请求进行写操作时才能进行读取操作.

写线程只有在没有线程进行读写操作是才能进行.除非你想确保线程写入请求的公平性,不然你可以忽略有多少线程进行写操作请求和它们的顺序.

基于以上给出的限制条件,我们可以实现一个公平锁,如下所示:

public class ReadWriteLock {
    private int readers = 0;
    private int writers = 0;
    private int writeRequests = 0;
    
    public synchronized void lockRead() throws InterruptedException {
        while(writers > 0 || writeRequests > 0){
            wait();
        }
        readers++;
    }
    
    public synchronized void unlockRead(){
        readers--;
        notifyAll();
    }
    
    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        while (readers > 0 || writers> 0){
            wait();
        }
        writeRequests--;
        writers++;
    }
    
    public synchronized void unlockWrite(){
        writers--;
        notifyAll();
    }
}
复制代码

ReadWrite对象中,一共有两个lock()方法和两个unlock方法,一对读操作的lock()和unlock()方法以及一对写操作的lock()和unlock()方法.

对于读操作的限制在lockRead()方法中实现.读操作只有在一个写线程取得读写锁或是有一到多个写请求的情况下才会阻塞等待.

对于写操作的限制在lockWrite()方法中实现.一个线程想要进行写操作首先要进行写请求(writeRequests++).然后才去检查是否已经有读或者写线程取得读写锁.若没有则取得读写所进行写入操作,如有则进入while循环内部调用wait()方法进入等待状态.这个时候当前有多少个写入请求对本次操作没有任何影响.

我们可以注意到,跟以往不同的是,我们在unlockRead()和unlockWrite()方法中用notifyAll()来代替notify().我们可以想象一下下面这种情况:

在读写锁中同时有读线程和写线程在等待中.如果通过notify()唤醒的线程是读线程时,它会马上重新进入等待状态.因为已经有一个写线程在等待中了,即已经存在一个写请求了.然而在没有任何写线程被唤醒情况下,将不会发生任何事情.如果换成调用notifyAll()的话,会唤醒所有等待中的线程,无论读写.而不是一个一个唤醒.

调用notifyAll()还有一个优势,即同时存在多个读线程的情况下,如果unlockWrite()被调用,所有读线程都能够被同时唤醒和同时操作,不用一个个来.

可重入的读写锁

上文中给出的ReadWrite.class示例并不支持可重入.如果一个线程多次发起写入请求,即多次尝试获取读写锁,将会陷入阻塞,因为此前已经有一个写线程获取到读写锁了,那就是它自己.可重入需要考虑以下几种情况:

  1. 线程1获得读权限
  2. 线程2请求进行写操作,但进入阻塞,因为当前有一个读线程正在进行中
  3. 线程1再次发起读请求(尝试再次获取读写锁),这次线程1陷入阻塞,因为当前已有一个写请求存在.

这种情况上文给出的ReadWriteLock.class将会陷入跟死锁类似的境地.无论线程读写都会陷入阻塞.

为了让ReadWriteLock.class支持可重入,需要做出一点更改.读写操作的可重入性需要分开处理.

读操作的可重入性

为了让ReadWiriteLock支持读操作的可重入性,我们需要补充下面限制:

  • 一个读线程能够多次进行读操作,在它已经获得读权限的情况下.

为了确认一个读线程获得多少次读写锁,我们需要一个Map引用来记录每一个读线程获取到读写锁的次数.当决定调用线程允不允许获得读写锁时需要检查Map引用中是否存在该线程.对lockRead()和unlockRead()的改写如下:

public class ReadWriteLock {
    private int writers = 0;
    private int writeRequests = 0;
    private Map<Thread, Integer> readingThreads = new HashMap<>();

    public synchronized void lockRead() throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        while (!canGrantReadAccess(callingThread)) {
            wait();
        }
        readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
    }

    public synchronized void unlockRead() {
        Thread callingThread = Thread.currentThread();
        int accessCount = getReadAccessCount(callingThread);
        if (accessCount > 1) {
            readingThreads.put(callingThread, (accessCount - 1));
        } else {
            readingThreads.remove(callingThread);
        }
        notifyAll();
    }

    private boolean canGrantReadAccess(Thread callingThread) {
        if (writers > 0) return false;
        if (isReader(callingThread)) return true;
        return writeRequests == 0;
    }

    private boolean isReader(Thread callingThread) {
        return readingThreads.get(callingThread) != null;
    }

    private int getReadAccessCount(Thread callingThread) {
        int accessCount = 0;
        if (readingThreads.containsKey(callingThread)) {
            accessCount = readingThreads.get(callingThread) + 1;
        }
        return accessCount;
    }
}
复制代码

你可以看到,在没有线程对资源进行写入操作时,读线程可以多次获取到读取权限.更多的,当调用线程已经获得过读操作时将优先其他写入请求获取到读写锁.

写操作的可重入性

写操作只有在已经获得写权限的情况下才能多次获取读写锁.对lockWrite()和unlockWrite()的改写如下:

public class ReadWriteLock {
    private int writerAccess = 0;
    private int writeRequests = 0;
    private Map<Thread, Integer> readingThreads = new HashMap<>();
    private Thread writingThread = null;

    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGrantWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writerAccess++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() {
        writerAccess--;
        if(writerAccess == 0) {
            writingThread = null;
        }
        notifyAll();
    }

    private boolean canGrantWriteAccess(Thread callingThread) {
        if (hasReaders()) return false;
        if (writingThread == null) return true;
        return isWriter(callingThread);
    }

    private boolean hasReaders() {
        return readingThreads.size() > 0;
    }

    private boolean isWriter(Thread callingThread) {
        return writingThread == callingThread;
    }
}
复制代码

我们需要将当前取得读写锁的读线程引用起来,以便决定当前调用线程是否已经获得过写权限,允许多次获取读写锁.

写操作到读操作的可重入性

有时候一个线程写操作完成后需要进行读操作.我们允许一个已经获得写权限的线程获得读权限.同一个线程同时进行读写并不会有并发问题.我们需要对 canGrantReadAccess() 进行如下改写:

private boolean canGrantReadAccess(Thread callingThread) {
        if(isWriter(callingThread)) return true;
        if (writers > 0) return false;
        if (isReader(callingThread)) return true;
        return writeRequests == 0;
    }
复制代码

一个完整的读写锁实现

下面是一个完整的读写锁实例.

public class ReadWriteLock {
    private int writerAccess = 0;
    private int writeRequests = 0;
    private Map<Thread, Integer> readingThreads = new HashMap<>();
    private Thread writingThread = null;

    public synchronized void lockRead() throws InterruptedException {
        Thread callingThread = Thread.currentThread();
        while (!canGrantReadAccess(callingThread)) {
            wait();
        }
        readingThreads.put(callingThread, getReadAccessCount(callingThread) + 1);
    }

    public synchronized void unlockRead() {
        Thread callingThread = Thread.currentThread();
        int accessCount = getReadAccessCount(callingThread);
        if (accessCount > 1) {
            readingThreads.put(callingThread, (accessCount - 1));
        } else {
            readingThreads.remove(callingThread);
        }
        notifyAll();
    }

    private boolean canGrantReadAccess(Thread callingThread) {
        if(isWriter(callingThread)) return true;
        if (writerAccess > 0) return false;
        if (isReader(callingThread)) return true;
        return writeRequests == 0;
    }

    private boolean isReader(Thread callingThread) {
        return readingThreads.get(callingThread) != null;
    }

    private int getReadAccessCount(Thread callingThread) {
        int accessCount = 0;
        if (readingThreads.containsKey(callingThread)) {
            accessCount = readingThreads.get(callingThread) + 1;
        }
        return accessCount;
    }

    public synchronized void lockWrite() throws InterruptedException {
        writeRequests++;
        Thread callingThread = Thread.currentThread();
        while (!canGrantWriteAccess(callingThread)) {
            wait();
        }
        writeRequests--;
        writerAccess++;
        writingThread = callingThread;
    }

    public synchronized void unlockWrite() {
        writerAccess--;
        if(writerAccess == 0) {
            writingThread = null;
            notifyAll();
        }
    }

    private boolean canGrantWriteAccess(Thread callingThread) {
        if (hasReaders()) return false;
        if (writingThread == null) return true;
        return isWriter(callingThread);
    }

    private boolean hasReaders() {
        return readingThreads.size() > 0;
    }

    private boolean isWriter(Thread callingThread) {
        return writingThread == callingThread;
    }
}
复制代码

在finally语句中调用unlock()

当使用ReadWriteLock来保证临界区代码的同步时,临界区中的代码可能会抛出异常。所以很有必要在finally语句中来调用unlockRead()和unlockWrite()方法。无论线程执行的代码异常与否,始终能够释放它所持有的读写锁,以让其他线程能正常执行。如下所示:

lock.lockWrite();
        try {
            //do critical section code, which may throw exception
        } finally {
            lock.unlockWrite();
        }
复制代码

我们使用这个小结构来保证即使临界区代码抛出异常,线程也能正常释放掉所持有的读写锁。如果我们没有这么做,一旦临界区代码出现异常,线程将无法释放读写锁,以至于其他调用了该读写锁的lockRead()和lockWrite()方法的线程将会永远等待下去。

该系列博文为笔者复习基础所著译文或理解后的产物,复习原文来自Jakob Jenkov所著 Java Concurrency and Multithreading Tutorial

原文  https://juejin.im/post/5cae22755188251b274ac2fc
正文到此结束
Loading...