转载

Java并发系列 — LockSupport

LockSupport 是用来创建锁和其他同步类的基本线程阻塞原语。

此类以及每个使用它的线程与一个许可关联(从 Semaphore 类的意义上说)。如果该许可可用,并且可在进程中使用,则调用 park 将立即返回;否则可能阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。(但与Semaphore 不同的是,许可不能累积,并且最多只能有一个许可。)

parkunpark 方法提供了阻塞和解除阻塞线程的有效方法,并且不会遇到导致过时方法Thread.suspend和Thread.resume 因为以下目的变得不可用的问题:由于许可的存在,调用park 的线程和另一个试图将其unpark的线程之间的竞争将保持活性。此外,如果调用者线程被中断,并且支持超时,则park将返回。park方法还可以在其他任何时间“毫无理由”地返回,因此通常必须在重新检查返回条件的循环里调用此方法。从这个意义上说,park是“忙碌等待”的一种优化,它不会浪费这么多的时间进行自旋,但是必须将它与unpark配对使用才更高效。

LockSupport 定义了如下静态方法:

Java并发系列 — LockSupport

三种形式的 park 还各自支持一个 blocker 对象参数。此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。(这样的工具可以使用方法 getBlocker(java.lang.Thread) 访问blocker。)建议最好使用这些形式,而不是不带此参数的原始形式。在锁实现中提供的作为 blocker 的普通参数是 this

这些方法被设计用来作为创建高级同步实用工具的工具,对于大多数并发控制应用程序而言,它们本身并不是很有用。park 方法仅设计用于以下形式的构造:

while (!canProceed()) { ... LockSupport.park(this); }

在这里,在调用park之前,canProceed和其他任何动作都不会锁定或阻塞。因为每个线程只与一个许可关联,park的任何中间使用都可能干扰其预期效果。

LockSupport成员变量分析

private static final sun.misc.Unsafe UNSAFE;
private static final long parkBlockerOffset;
private static final long SEED;
private static final long PROBE;
private static final long SECONDARY;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> tk = Thread.class;
        parkBlockerOffset = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("parkBlocker"));
        SEED = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSeed"));
        PROBE = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomProbe"));
        SECONDARY = UNSAFE.objectFieldOffset
            (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
    } catch (Exception ex) { throw new Error(ex); }
}
sun.misc.Unsafe
parkBlockerOffset
/**
 * The argument supplied to the current call to
 * java.util.concurrent.locks.LockSupport.park.
 * Set by (private) java.util.concurrent.locks.LockSupport.setBlocker
 * Accessed using java.util.concurrent.locks.LockSupport.getBlocker
 */
volatile Object parkBlocker;

从注释上看,就是给LockSupport的 setBlockergetBlocker 调用。另外在LockSupport的java doc中也写到:

This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using method [getBlocker(Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as a blockerwithin a lock implementation is this.

大致是说, parkBlocker 是当线程被阻塞的时候被记录,以便监视和诊断工具来识别线程被阻塞的原因。

Unsafe类提供了获取某个字段相对Java对象的“起始地址”的偏移量的方法objectFieldOffset,从而能够获取该字段的值。那么为什么记录该blocker在对象中的偏移量,而不是直接调用 Thread.getBlocker() ,这样不是更好。

parkBlocker 就是在线程处于阻塞的情况下才会被赋值。线程都已经阻塞了,如果不通过这种内存的方法,而是直接调用线程内的方法,线程是不会回应调用的。

LockSupport的重要方法

park()

park方法阻塞的是当前的线程,也就是说在哪个线程中调用,那么哪个线程就被阻塞(在没有获得许可的情况下)。

public static void park() {
    UNSAFE.park(false, 0L);
}

UNSAFE.park 的两个参数,第一个参数为true的时候表示传入的是绝对时间,false表示相对时间,即从当前时间开始算。第二个参数就是等待的时间,0L表示永久等待。

根据java doc中的描述,调用park后有三种情况,能使线程继续执行下去:

  1. 有某个线程调用了当前线程的unpark。
  2. 其他线程中断(interrupt)了当前线程
  3. 该调用不合逻辑地(即毫无理由地)返回。

验证一:

public class UnparkTest {
    public static void main(String[] args) throws InterruptedException {
        Thread ut = new Thread(new UnparkThread(Thread.currentThread()));
        ut.start();
        System.out.println("I'm going to call park");
        // Thread.sleep(1000L);
        LockSupport.park();
        System.out.println("oh, I'm running again");

    }
}

class UnparkThread implements Runnable {
    private final Thread t;
    UnparkThread(Thread t) {
        this.t = t;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("I'm in unpark");
        LockSupport.unpark(t);
        System.out.println("I called unpark");
    }
}

运行结果:

Java并发系列 — LockSupport

LockSupport对park和unpark的调用顺序并没有要求,将两个Thread.sleep(1000L);注释切换一下就可以发现,先调用unpark,再调用park,依旧可以获得许可,让线程继续运行。这一点与Object的wait和notify 要求固定的顺序不同。

验证二:

public class LockSupportInterrupt {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(new InterruptThread());
        t.start();
        Thread.sleep(1000L);
        System.out.println("I'm going to interrupt");
        t.interrupt();
    }
}

class InterruptThread implements Runnable {
    @Override
    public void run() {
        System.out.println("I'm going to park");
        LockSupport.park();
        System.out.println("I'm going to again");
    }
}

运行结果:

Java并发系列 — LockSupport

LockSupport的park能够能响应interrupt事件,且不会抛出InterruptedException异常。

park(Object blocker)

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}
  1. 在调用park阻塞当前线程之前,先记录当前线程的blocker。
  2. 调用park阻塞当前线程
  3. 当前面提到的三个让线程继续执行下去的情况时,再将parkBlocker设置为null,因为当前线程已经没有被blocker住了,如果不设置为null,那诊断工具获取被阻塞的原因就是错误的,这也是为什么要有两个setBlocker的原因。

再看一下setBlocker的代码:

private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

方法是私有的,嗯,为了保证正确性,肯定不能被其他类调用。另外就是利用了之前提到的偏移量以及unsafe对象将blocker值设置进了线程t当中。

unpark(Thread thread)

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

判断是否为空,然后调用unsafe的unpark方法。由此更可见unsafe这个类的重要性。

park和unpark实现原理

可能有些朋友还是不理解“许可”这个概念,我们深入HotSpot的源码来看看。

每个java线程都有一个Parker实例,Parker类是这样定义的:

Java并发系列 — LockSupport

可以看到Parker类实际上用Posix的mutex,condition来实现的。在Parker类里的_counter字段,就是用来记录所谓的“许可”的。

当调用park时,先尝试直接能否直接拿到“许可”,即 _counter>0时,如果成功,则把 _counter设置为0,并返回:

Java并发系列 — LockSupport

如果不成功,则构造一个ThreadBlockInVM,然后检查 _counter是不是>0,如果是,则把 _counter设置为0,unlock mutex并返回:

Java并发系列 — LockSupport

否则,再判断等待的时间,然后再调用pthreadcondwait函数等待,如果等待返回,则把_counter设置为0,unlock mutex并返回:

Java并发系列 — LockSupport

当unpark时,则简单多了,直接设置 _counter为1,再unlock mutext返回。如果 _counter之前的值是0,则还要调用pthreadcondsignal唤醒在park中等待的线程:

Java并发系列 — LockSupport

简而言之,是用mutex和condition保护了一个_counter的变量,当park时,这个变量置为了0,当unpark时,这个变量置为1。

值得注意的是在park函数里,调用pthreadcondwait时,并没有用while来判断,所以posix condition里的"Spurious wakeup"一样会传递到上层Java的代码里。关于"Spurious wakeup",可以参考:并行编程之条件变量(posix condition variables)

各种例子

jstack查看parkBlocker

public class JstackTest {

    public static void main(String[] args) {
        // 给main线程设置名字,好查找一点
        Thread.currentThread().setName("jstacktest");
        LockSupport.park("block");
    }
}

利用park(blocker)来阻塞main线程,传入string作为parkBlocker。

运行之后,运行jps命令:

Java并发系列 — LockSupport

然后再利用jstack来查看:

Java并发系列 — LockSupport

利用LockSupport实现先进先出锁

public class FIFOMutex {

    private final AtomicBoolean locked = new AtomicBoolean(false);
    private final Queue<Thread> waiters
            = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        waiters.add(current);

        // Block while not first in queue or cannot acquire lock
        while (waiters.peek() != current ||
                !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            if (Thread.interrupted()) // ignore interrupts while waiting
                wasInterrupted = true;
        }

        waiters.remove();
        if (wasInterrupted)          // reassert interrupt status on exit
            current.interrupt();
    }

    public void unlock() {
        locked.set(false);
        LockSupport.unpark(waiters.peek());
    }
}

先进先出锁就是先申请锁的线程最先获得锁的资源,实现上采用了队列再加上LockSupport.park。

  1. 将当前调用lock的线程加入队列
  2. 如果等待队列的队首元素不是当前线程或者locked为true,则说明有线程已经持有了锁,那么调用park阻塞其余的线程。
  3. 如果队首元素是当前线程且locked为false,则说明前面已经没有人持有锁,删除队首元素也就是当前的线程,然后当前线程继续正常执行。
  4. 执行完后调用unlock方法将锁变量修改为false,并解除队首线程的阻塞状态。此时的队首元素继续之前的判断。

如果读完觉得有收获的话,欢迎点赞、关注、加公众号【牛觅技术】,查阅更多精彩历史!!!:

Java并发系列 — LockSupport
原文  https://juejin.im/post/5b018f70f265da0b7d0ba86a
正文到此结束
Loading...