转载

追踪解析 ReentrantLock 源码

零 前期准备

0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ReentrantLock 简介

ReentrantLock 是 jdk 中经典的高性能重用锁,作为基础组件经常能在 jdk 的其它并发框架中看到。

笔者希望能够通过这次代码阅读弄懂 AbstractQueueSynchronizer 和 ReentrantLock 的基本构造。

本文特指非公平锁的代码实现,对于公平锁暂不做详解。

注意,ReentrantLock 在 jdk11 中相比 jdk8 (这样做比较是因为 jdk8 是目前工作中最常用的版本) 有了一些代码上的改变,笔者认为代码更加精简了,但是具体性能没有做过实战测试和系统比较。

3 Demo

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {
    //创建一个 lock
    private ReentrantLock lock = new ReentrantLock();

    public void demo(){
        //上锁
        lock.lock();
        //打印当前线程的线程名
        System.out.println(Thread.currentThread().getName());
        //让线程休眠十秒,在此期间线程不会交出锁,所以其它调用该方法的线程都会阻塞
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) { }
        //释放锁
        lock.unlock();
    }

    //main 方法
    public static void main(String[] args) {
        
        LockDemo demo = new LockDemo();
        //创建两个线程来调用同一个 demo 实例对象,就能看出锁的作用了
        new Thread(new LockDemoRunner(demo)).start();
        new Thread(new LockDemoRunner(demo)).start();

    }
}

//Runnable 实现类,用于创建线程对象
class LockDemoRunner implements Runnable{
    //多个线程对象公用一个 demo 的实例
    private LockDemo lockDemo;
    //构造器
    LockDemoRunner(LockDemo demo){ lockDemo = demo;}
    @Override
    public void run() {
        lockDemo.demo();
    }
}

一 Sync

先来看一下 ReentrantLock 的默认构造器:

//ReentrantLock.class
public ReentrantLock() {
    sync = new NonfairSync();
}

再来看一下 ReentrantLock 的加锁和解锁方法:

//ReentrantLock.class
public void lock() {
    sync.acquire(1);
}

//ReentrantLock.class
public void unlock() {
    sync.release(1);
}

可以发现,ReentrantLock 的逻辑主要由 sync 对象实现。

而 sync 则是一个 NonfairSync 类型的对象。NonfaireSync 是 ReentrantLock 的静态内部类:

//ReentrantLock.class
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    
    //此方法用于尝试去获取锁
    //tryAcquire(...) 本来是 AbstractQueuedSynchronizer 中的方法,此处为重写
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

其实 NonfairSync 的主要逻辑在其父类 Sync 中实现。Sync 同样也是 ReentrantLock 的静态内部类。

Sync 中比较重要的是两个分别用于加解锁的方法:

//Sync.class
//此方法用于加锁
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程的线程实例对象
    final Thread current = Thread.currentThread();
    //获取锁的状态
    //c 的初始状态值为 0,意思是还未上锁
    int c = getState();
    if (c == 0) {
        //compareAndSetState(...) 方法是 AbstractQueuedSynchronizer 中非常重要的方法
        //用以更新锁状态
        //此处的 acquires = 1
        if (compareAndSetState(0, acquires)) {
            //保存线程对象
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) {
        //如果当前线程就是正在执行的线程,则将目前的状态值 status 与 acquires 相加,保存为新的 status
        int nextc = c + acquires;
        //逻辑上此处是不会为负数的,只用于严谨逻辑
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

另一个方法:

//Sync.class
//此方法用于尝试解锁
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //如果当前线程并不是正在执行的线程,则没有权限去解锁,会直接报错
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //锁解开之后将保存的当前线程对象置空
        setExclusiveOwnerThread(null);
    }
    //此处更新状态值
    setState(c);
    return free;
}

Sync 中的其它方法主要用于状态判断,都比较简单,不再累述。

二 AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 是 jdk 中用来构建同步组件的框架类,是 Sync 的父类。

之前解析 ThreadPoolExecutor 的内部类 Worker 的时候也初步了解过。

在上述探索 Sync 的过程中用到了很多此类中的方法。所以对用到的方法做一个追踪。

1 status

AbstractQueuedSynchronizer 中最重要的一个状态控制变量是 status,它代表锁目前是被使用还是空闲:

//AbstractQueuedSynchronizer.class
private volatile int state;

获取和存入 status 的值:

//AbstractQueuedSynchronizer.class
protected final int getState() {
    return state;
}
//AbstractQueuedSynchronizer.class
protected final void setState(int newState) {
    state = newState;
}

以上的 set/get 方法都很简单和常规,但是实际上 status 的存值主要使用该类中的另一个方法:

//AbstractQueuedSynchronizer.class
protected final boolean compareAndSetState(int expect, int update) {
    //compareAndSet(...) 这个方法在使用 AtomicInteger 的时候接触过,但是此处的 STATE 并不是 AtomicInteger
    //STATE 是一个定义在 AbstractQueuedSynchronizer 中的 VarHandle 类型的变量
    //VarHandle 是 jdk9 中新增的一个并发工具,目前网上对此工具的分析比较少
    //基本能判断的是,此工具的作用与 AtomicXXXX 工具类很类似,能提供原子化的操作,并在一定程度上替代 Unsafe

    //此方法用于先比较该实例对象中的 status 是否与第二个参数的值相等,如果是,则将 status 替换成第三个参数的值,返回 true
    //如果不相等,就不进行替换,并返回 false
    return STATE.compareAndSet(this, expect, update);
}

VarHandler 的实例化也比较神奇,可以做一下了解:

//定义的是 static 对象,可以被反复利用,而不是 AtomicXXXX 的模式了
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;

static {
    try {
        //VarHandler 的实例化工厂
        MethodHandles.Lookup l = MethodHandles.lookup();
        //实例化的时候将类 class、参数的名称、参数的类型 传入,就可以创建一个指向此参数的 VarHandler 实例对象了
        STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
        HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
        TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
    } catch (ReflectiveOperationException e) {
        throw new ExceptionInInitializerError(e);
    }
    //LockSupport 是一个服务于锁的静态工具类
    Class<?> ensureLoaded = LockSupport.class;
}

2 Node

AbstractQueuedSynchronizer 有一个静态内部类 Node,本质上是一个双向链表的节点对象。

同时 AbstractQueuedSynchronizer 还有两个 Node 节点对象:

//双向链表的头结点
private transient volatile Node head;
//双向链表的尾节点
private transient volatile Node tail;

每个 Node 节点对象的内部储存有一个 Thread 对象,即为等待执行的线程的实例化对象:

//Node.class
//此为 Node 的构造方法,传入的 nextWaiter 即为该节点的后一个节点
Node(Node nextWaiter) {
    this.nextWaiter = nextWaiter;
    //THREAD 是一个 VarHandler 对象,用于将线程对象存入当前节点中
    //注意,当前线程被存入了当前的 Node,而不是下一个 Node
    THREAD.set(this, Thread.currentThread());
}

再来看一个增加 Node 的方法:

//AbstractQueuedSynchronizer.class
private Node addWaiter(Node mode) {
    //此时当前线程对象已经被存入 node 对象中
    Node node = new Node(mode);
    
    //比较见名知意,所以不展开细讲了,只稍微提及
    //这个 for 循环用于将 node 对象添加到链表的尾部,代替掉之前的 tail 对象
    //有一种特殊情况,即 oldTail 是 null,则代表着该链表其实是空的,没有任何节点
    //这种情况下调用 initializeSyncQueue() 方法初始化链表,即此时 node 对象既是头节点也是尾节点
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

3 lock

来看 ReentrantLock 的加锁方法:

//ReentrantLock.class
public void lock() {
    sync.acquire(1);
}

acquire(...) 方法在 AbstractQueuedSynchronizer 中实现:

//AbstractQueuedSynchronizer.class
public final void acquire(int arg) {
    //此处的判断条件处,会先调用 tryAcquire(...) 方法去尝试获取锁的使用权
    //如果获取成功,此处 tryAcquire(...) 方法会返回 true,那么 !tryAcquire(...) = false,此方法会直接结束
    //如果获取失败,此时 !tryAcquire(...) = true,进入 acquireQueued(...) 方法
    //在执行 acquireQueued(...) 方法之前,会先执行 addWaiter(...) 方法,此时当前线程已经被存入尾部节点中
    //Node.EXCLUSIVE 是一个 null 对象
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //selfInterrupt() 会中断当前线程,使得线程处于等待被唤醒的状态
        selfInterrupt();
}

再来追踪一下 acquireQueued(...) 方法:

//AbstractQueuedSynchronizer.class
final boolean acquireQueued(final Node node, int arg) {
    //在本例中此时传入的 node 是链表的尾节点,且是存储了当前线程的节点对象
    //arg = 1

    boolean interrupted = false;
    try {
        for (;;) {
            //获取当前节点的前一个节点
            final Node p = node.predecessor();
            //如果 p 节点是头节点,且当前线程尝试获取锁的使用权成功
            //则让当前节点成为头节点,并删去原先的头节点(即 p 节点)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null;
                return interrupted;
            }

            //shouldParkAfterFailedAcquire(...) 方法会根据 node 的前一节点的状态来判断该节点是否要被挂起或者唤醒
            //parkAndCheckInterrupt(...) 内部会调用 unsafe 的相关方法挂起节点
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

由此可见,当前节点 node 的最终处理方式是由其的前一节点的状态来确定的。Node 内置了几种状态值:

//CANCELLED 代表该节点处于取消状态,该节点不会执行了
static final int CANCELLED =  1;
//SIGNAL 代表该节点的下一节点处于阻塞状态,会之后被执行
static final int SIGNAL    = -1;
//CONDITION 代表该节点处于阻塞状态
static final int CONDITION = -2;
//PROPAGATE 代表共享状态
static final int PROPAGATE = -3;
//还有一种状态 0,即为节点的初始状态

了解了状态之后再来看 shouldParkAfterFailedAcquire(...) 方法:

//AbstractQueuedSynchronizer.class
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取 node 节点的前节点的状态
    int ws = pred.waitStatus;
    //如果是 SIGNAL,直接返回 true,然后 node 节点会进入被挂起
    if (ws == Node.SIGNAL)
        return true;

    //状态值大于 0,只可能是 CANCELLED 状态,即此节点已经被废弃了
    if (ws > 0) {
        //不断往前遍历,将中间被废弃的节点全部剔除出链表中
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果状态值并非 CANCELLED 或者 SIGNAL,在这里会将状态值修改成 SIGNAL
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    //只要 pred 的状态值不为 SIGNAL,都会返回 false
    return false;
}

shouldParkAfterFailedAcquire(...) 方法和 parkAndCheckInterrupt(...) 方法共同作用,对不符合的节点进行剔除,对符合要求的节点进行挂起操作。

这样一来节点所封装的线程也就进入了阻塞队列中,等待被锁唤醒。

4 unlock

回到 ReentrantLock 的解锁代码:

//ReentrantLock.class
public void unlock() {
    sync.release(1);
}

release(...) 在 AbstractQueuedSynchronizer 中实现:

//AbstractQueuedSynchronizer.class
public final boolean release(int arg) {
    //此处的 tryRelease(...) 是 Sync 中重写之后的方法,具体看上述 Sync 的实现
    //此方法修改了当前锁的状态值
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

代码和上一 part 的比较类似,重点来看一下 unparkSuccessor(...) 方法:

//AbstractQueuedSynchronizer.class
private void unparkSuccessor(Node node) {

    //获取 node 的状态值
    int ws = node.waitStatus;
    //如果状态值为 SIGNAL、CONDITION、PROPAGATE 的话就会转成 0
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    //获取 node 的下一节点 s
    Node s = node.next;
    //如果 s 节点不存在,或者已经被废弃了,就会一直轮询,找到一个符合条件的
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    //将 s 节点激活
    //注意此处的 s 节点为 node 的下一个能够被使用的节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor(...) 方法的核心是选择符合要求的下一节点,并将其所代表的线程对象从挂起状态唤醒。

注意,此处展示的是非公平锁的逻辑。非公平锁讲求先到先得,会依次唤醒线程并执行代码块。公平锁则一次性唤醒所以线程并进行一次公平争夺。

三 LockSupport

1 park

LockSupport 的 park(...) 方法用于挂起线程:

//LockSupport.class
public static void park(Object blocker) {
    //获取当前线程对象
    Thread t = Thread.currentThread();
    //blocker 在本例中即为锁对象本身,setBlocker(...) 方法主要是做一下记录,此线程是被谁阻塞了
    setBlocker(t, blocker);
    //挂起线程
    U.park(false, 0L);
    //当线程执行到这句代码的时候,说明线程已经从挂起状态被唤醒了
    //所以这里可以清空掉记录
    setBlocker(t, null);
}

继续追踪 setBlocker(...) 方法:

//LockSupport.class
private static void setBlocker(Thread t, Object arg) {
    //putObject(...) 方法会把一个对象存到指定的地址处
    //此例的 t 是当前的线程对象,PARKBLOCKER 是一个 long 类型的内存地址,arg 是锁对象本身
    U.putObject(t, PARKBLOCKER, arg);
}

可以看到,park(...) 方法和 setBlocker(...) 方法都是调用了 U 对象的相关方法。

U 对象是一个 Unsafe 实例:

//LockSupport.class
private static final Unsafe U = Unsafe.getUnsafe();

Unsafe 的 park(...) 方法用于挂起线程,putObject(...) 方法用于存入对象。

2 unpark

//LockSupport.class
public static void unpark(Thread thread) {
    //唤醒线程
    if (thread != null)
        U.unpark(thread);
}

四 一点唠叨

总结一下重入锁的业务逻辑:

1 当第一个线程进入到锁区域(即调用 lock() 方法)的时候,会被锁记录为当前线程,并且修改锁的状态值

2 当有其它线程进入到该代码块,但是锁的状态值并不是初始值(即之前的线程未释放锁资源)的时候,线程会被封装成节点并存入到链表的末尾,然后被挂起

3 之前的线程释放锁资源(即调用 unlock() 方法)的时候,锁会去遍历内部的链表,唤醒下一个符合要求的线程(特指非公平锁)

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充。

原文  https://segmentfault.com/a/1190000018078938
正文到此结束
Loading...