转载

从有锁到无锁(五):管程与阻塞同步

Monitors are a structured way of combining synchronization and data. A class encapsulates both data and methods in the same way that a monitor combines data, methods, and synchronization in a single modular package.

管程是一种模块化的同步方式,类似于类混合了数据和操作数据的方法。比如说生产者消费者队列,不应该由调用者来管理同步:

mutex.lock();
try {
    queue.enq(x);
} finally {
    mutex.unlock();
}
复制代码

这样写就会给客户端造成状态管理的麻烦。显然,如果线程试图入队一个已满的队列,应该挂起(阻塞)线程直到可以插入。这里的管实际上是代为管理同步的意思。

维基百科上还有另一个更为人熟知的定义:

In concurrent programming, a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true. Monitors also have a mechanism for signaling other threads that their condition has been met . A monitor consists of a mutex (lock) object and condition variables. A condition variable is basically a container of threads that are waiting for a certain condition. Monitors provide a mechanism for threads to temporarily give up exclusive access in order to wait for some condition to be met, before regaining exclusive access and resuming their task.

当线程请求锁时,可以自旋也可以阻塞,所以管程需要一种线程间消息传递的机制来唤醒线程,即 Condition 。条件变量通常对应一个线程等待队列,条件变量改变后可以发送一个信号(比如 pthread_cond_signal ),唤醒在条件上等待的线程(一个或多个节点出队)。大家都知道,Java内置了管程,即 synchronized 机制。但这个内置的管程在实现上力求简单,因此只有一个隐式的条件变量。这就是尽管它也叫管程,我们却从来无法从 wait 得到 Condition 的原因。Java的 Lock 接口提供了对 Condition 的抽象。AQS在阻塞时会调用 LockSupport.park ,而它在Linux上的实现还是基于 mutexcondition ,所以也可以说从程序语言到OS只是不同程度的抽象,本质上都是一致的。

所以究竟如何理解管程这个术语呢?我觉得管程可以认为就是对数据结构封装了同步功能。比如,我们可以把 StringBuffer 的实现方法看作是一个管程,因为它的所有方法都被 synchronized 修饰。

Conditions

条件变量的典型使用是:

Condition condition = mutex.newCondition();
...
mutex.lock()
try {
    while (!property) {
        condition.await();
    } catch (InterruptedException e) {
        ... // application-dependent response
    }
    ...
}
复制代码

使用条件变量可以实现一个典型的生产者消费者队列,具体就不说了,可以参考 LinkedBlockingQueue 的源代码。条件变量可能会出现 唤醒丢失(Lost-Wakeup)问题 ,这个我在前面的队列篇已经详细的描述过了。

有两种策略解决唤醒丢失:

signalAll

synchronized

synchronized 可以将每个对象作为一个管程。本质上, synchronized 修饰的方式可以分为两种,实例对象和静态对象。比如,修饰静态方法其实属于后者,因为它锁定了类。我们都知道, synchronized 将锁信息存储在对象头。那么如何实现锁定静态对象呢?答案就是锁定类文件。

这里可以用一个比较方便的工具JOL(Java Object Layout),可以打印对象的内存布局。我们写一个非常简单的demo:

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Object b = new Object();
        System.out.println(b);
        System.out.println(ClassLayout.parseInstance(b).toPrintable());
    }
}
复制代码

这将得到一个包含对象哈希值的对象头:

java.lang.Object@74a14482
java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           01 82 44 a1 (00000001 10000010 01000100 10100001) (-1589345791)
      4     4        (object header)                           74 00 00 00 (01110100 00000000 00000000 00000000) (116)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total
复制代码

加入 synchronized 之后:

public class Demo {
    public static void main(String[] args) throws InterruptedException {
        Object b = new Object();
        System.out.println(b);
        ExecutorService service = Executors.newCachedThreadPool();
        service.invokeAll(Collections.nCopies(2,  () ->  {
            synchronized (b) {
                System.out.println(ClassLayout.parseInstance(b).toPrintable());
                return 0;
            }
        }));
        Thread.sleep(1000);
        System.out.println(ClassLayout.parseInstance(b).toPrintable());
        service.shutdown();
    }
}

java.lang.Object@74a14482
java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           da 4f 23 26 (11011010 01001111 00100011 00100110) (639848410)
      4     4        (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
...
java.lang.Object object internals:
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
      0     4        (object header)                           01 82 44 a1 (00000001 10000010 01000100 10100001) (-1589345791)
      4     4        (object header)                           74 00 00 00 (01110100 00000000 00000000 00000000) (116)
      8     4        (object header)                           e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
     12     4        (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

复制代码

这里有一个小细节是我给主线程加了一个延时,否则主线程将会打印和工作线程相同的对象头。可以看到,这里 synchronized 的锁标志是10,即重量级锁,因为存在竞争。

具体的内存字段可以看这个:

|----------------------------------------------------------------------------------------|--------------------|
|                                    Object Header (64 bits)                             |        State       |
|-------------------------------------------------------|--------------------------------|--------------------|
|                  Mark Word (32 bits)                  |      Klass Word (32 bits)      |                    |
|-------------------------------------------------------|--------------------------------|--------------------|
| identity_hashcode:25 | age:4 | biased_lock:1 | lock:2 |      OOP to metadata object    |       Normal       |
|-------------------------------------------------------|--------------------------------|--------------------|
|  thread:23 | epoch:2 | age:4 | biased_lock:1 | lock:2 |      OOP to metadata object    |       Biased       |
|-------------------------------------------------------|--------------------------------|--------------------|
|               ptr_to_lock_record:30          | lock:2 |      OOP to metadata object    | Lightweight Locked |
|-------------------------------------------------------|--------------------------------|--------------------|
|               ptr_to_heavyweight_monitor:30  | lock:2 |      OOP to metadata object    | Heavyweight Locked |
|-------------------------------------------------------|--------------------------------|--------------------|
|                                              | lock:2 |      OOP to metadata object    |    Marked for GC   |
|-------------------------------------------------------|--------------------------------|--------------------|
复制代码

重量级锁对应系统的管程,它的后30位是指向管程的指针。你可以在字节码中看到, synchronized 会产生 moniterentermoniterexit 字节码。

再看一下 synchronized 如何针对类文件加锁:

public class Demo {
    private static Integer v = 1;

    public synchronized static void main(String[] args) throws InterruptedException {
        System.out.println(Integer.toHexString(Demo.class.hashCode()));
        System.out.println(ClassLayout.parseInstance(Demo.class).toPrintable());
    }

}

4554617c
java.lang.Class object internals:
 OFFSET  SIZE                                              TYPE DESCRIPTION                               VALUE
      0     4                                                   (object header)                           58 f7 f1 02 (01011000 11110111 11110001 00000010) (49411928)
      4     4                                                   (object header)                           00 00 00 00 (00000000 00000000 00000000 00000000) (0)
      8     4                                                   (object header)                           df 03 00 f8 (11011111 00000011 00000000 11111000) (-134216737)
...
复制代码

这里注意不要调用 parseClass ,它是解析这个类对应的对象,而不是类文件对象本身。可以看到它生成了轻量级锁,因为只有主线程在调用。

阻塞(block)这个术语不论是在中文还是英文都被重载了太多次了。在JVM里经常看到状态显示的是 Monitor 而不是 Blocked 。Java中 Thread.State 的定义:

  • BLOCKED A thread that is blocked waiting for a monitor lock is in this state.
  • WAITING A thread that is waiting indefinitely for another thread to perform a particular action is in this state.

在TAOMP中,阻塞对应的是操作系统中线程的休眠,不占用CPU资源,而Java把这个状态细分了,即分为阻塞和等待(还有超时等待)。其实它们应该都对应操作系统的一个状态。

阻塞和等待的区别是,阻塞状态可以自动获得监视器锁并转化状态,而等待必须手动的发送信号来打破。在Java中,后进入 synchronized 代码块会产生阻塞;而调用 notify 之后,线程没有立即得到锁,也会被阻塞。

其实这两个状态的细分,还意味着 synchronized 的管程至少实现了等待集和准入集两个结构,分别对应 WAITINGBLOCKED

从有锁到无锁(五):管程与阻塞同步

读写锁

读写锁是一个被数据库、操作系统等诸多领域广泛应用的情景。读写锁可以归纳为这样几个特点:

  • No thread can acquire the write lock while any thread holds either the write lock or the read lock.
  • No thread can acquire the read lock while any thread holds the write lock.
  • Naturally, multiple threads may hold the read lock at the same time.

即读阻塞写,写阻塞读,读不阻塞读。Java中关于读写锁的标准接口是 ReadWriteLock ,默认实现 ReentrantReadWriteLock

可以使用管程构造一个简单的读写锁。如果没有读者不互斥,那么读者写者就是一般互斥问题。读者写者之间必须要进行同步,因此读锁在一个是否存在写者的条件上阻塞,写锁在一个读者是否为0的条件上阻塞,写锁需要一个全局计数器来确认所有读者都已经退出。这两个条件都是在同一个 Condition ,即同一个锁上。因为本质上,我们把写者和读者都看做是同一个临界区。

protected class ReadLock implements Lock {
    @Override
    public void lock() {
        lock.lock();
        try {
            while (writer) {
                condition.await();
            }
            readers++;
        } catch (InterruptedException e) {
            // empty
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock() {
        lock.lock();
        try {
            readers--;
            if (readers == 0) {
                condition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}
复制代码
protected class WriteLock implements Lock {
    @Override
    public void lock() {
        lock.lock();
        try {
            while (readers > 0 || writer) {
                condition.await();
            }
            writer = true;
        } catch (InterruptedException e) {
            // empty
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock() {
        writer = false;
        condition.signalAll();
    }
}
复制代码

一个细节是写锁的解锁并没有使用这个锁,因为它是独占的。

这个简单读写锁的缺点在于公平性(操作系统里应该讲烂了吧)。Condition上阻塞的线程是顺序的,但是当读者进入临界区时,后来的读者可以越过阻塞中的写者。如果读者比写者频繁,那么写者一直被阻塞。所以要应用写者优先原则:写者一旦因为释放锁而阻塞,那么任何读者都不能获得锁。这里有一个简单的FIFO实现:只需要先设置writer为true。这里需要把读写之间的阻塞和写写阻塞分开。

protected class WriteLock implements Lock {
    @Override
    public void lock() {
        lock.lock();
        try {
            while (writer) {
                condition.await();
            }
            writer = true;
            while (readAcquires != readReleases) {
                condition.await();
            }            
        } catch (InterruptedException e) {
            // empty
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void unlock() {
        writer = false;
        condition.signalAll();
    }
}
复制代码

这里书中将原先的读者计数器改为两个计数器,即读者请求读锁和释放读锁的次数,我不是很理解。感觉这里完全和一个计数器是等价的?

ReentrantReadWriteLock实现

AQS实现 ReentrantReadWriteLock 是非常有意思的,因为它使用了互斥模式和共享模式的混合。它给读锁分配共享节点,给写锁分配独占节点。它的公平和非公平同步器,也就是前面的内部锁实现如下:

/**
 * Nonfair version of Sync
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    final boolean readerShouldBlock() {
        /* As a heuristic to avoid indefinite writer starvation,
         * block if the thread that momentarily appears to be head
         * of queue, if one exists, is a waiting writer.  This is
         * only a probabilistic effect since a new reader will not
         * block if there is a waiting writer behind other enabled
         * readers that have not yet drained from the queue.
         */
        return apparentlyFirstQueuedIsExclusive();
    }
}

/**
 * Fair version of Sync
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
}
复制代码

在AQS中,我们知道 hasQueuedPredecessors 可以判断是否为队列首节点,这是一个子类实现AQS时公平性的判断。 ReentrantReadWriteLock 认为写者优先反而是不公平的,但它设置了一个启发式的避免饥饿的方法,调用AQS的 apparentlyFirstQueuedIsExclusive 判断是否队列里的首个节点为独占模式。也就是说,如果等待队列里第一个就是写节点,此时读必须排队。

而核心代码都在这两种模式的基类 Sync 里。先看一下 tryAcquire

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
复制代码

WriteLock 中用到了该方法。这个方法逻辑也很简单,就是判断是否当前线程是独占的线程,如果是就增加重入计数;不是则返回失败;如果没有线程占有资源,就尝试CAS来获取。它和我们之前的简单读写锁实现逻辑顺序不同:它是先CAS尝试获取资源,失败则阻塞,而之前的实现总是先获得锁。

ReentrantReadWriteLock 把写者和读者计数都放在了 state 里,通过位运算分离。写者占据低16位。但读者有多个,因此不光有 state 的状态变化,还有一个 readHolds 的是 ThreadLocal 变量 作为读者重入的计数。另外,读锁还使用 cachedHoldCounter ,这是对 readHolds 的一个缓存,因为取 ThreadLocal 是有开销的(虽然类似于从哈希表查值)。你可以看到每次得到锁的线程都会把这个值设置为自己的 readHolds

protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
复制代码

为什么这里不需要读者为0的判断呢?实际上是有的,可以看到写锁一开始判断 c != 0 ,因为读写者计数都在 state 里。所以这也是位运算的方便之处。

考虑一种需求——既想持有读锁,也想持有写锁。这里分为两种方式:

  • 先持有写锁,后申请读锁
  • 先持有读锁,后申请写锁

第一种情况是可以的,另外如果再释放写锁,我们可以称之为锁降级(Lock downgrading,这不是我取的,源码注释里有)。从前面读锁的实现来看,它允许当前线程是写锁持有者的条件下持有读锁,但写锁的条件是 w == 0 || current != getExclusiveOwnerThread() ,即有读者或者当前不是持有写锁的写者时都无法得到写锁。

信号量

信号量是互斥锁的一般形式,它允许多个线程同时进入临界区。一个标准的信号量实现如下:

public class Semaphore {
	final int capacity;
	int state;
	Lock lock;
	Condition condition;
	public Semaphore(int c) {
		capacity = c;
		state = 0;
		lock = new ReentrantLock();
		condition = lock.newCondition();
	}
	public void acquire() {
		lock.lock();
		try {
			while (state == capacity) {
				condition.await();
			}
			state++;
		} finally {
			lock.unlock();
		}
	}
    public void release() {
    	lock.lock();
    	try {
    		state--;
    		condition.signalAll();
    	} finally {
    		lock.unlock();
    	}
    }

}
复制代码

AQS里的 Semaphore 也是非常简单,基本上就是实现一个共享模式的案例,细节就不说了,有时间看一下。

原文  https://juejin.im/post/5dedeee8518825123b1a8a88
正文到此结束
Loading...