转载

JDK 源码分析:DelayQueue

概述

DelayQueue 也是一种队列,它内部的元素有“延迟”,也就是当从队列中获取元素时,如果它的延迟时间未到,则无法取出。

DelayQueue 的类签名和 承结构如下:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>

implements BlockingQueue<E> {}

JDK 源码分析:DelayQueue

下面分析其代码实现。

代码分析

相关接口

DelayQueue 中的元素要实现 Delayed 接口,该接口 定义如下:

public interface Delayed extends Comparable<Delayed> {

/**

* 以给定的时间单位,返回该对象的剩余延迟

* 若为零或者负数表示延时已经过去

*/

long getDelay(TimeUnit unit);

}

Delayed 接口继承自 Comparable 接口,而它本身只定义了一个 getDelay 方法,该方法的作用是获取对象的剩余延迟时间。

Comparable 接口也只有一个 compareTo 方法:

public interface Comparable<T> {

public int compareTo(T o);

}

这里不再详述。

构造器

DelayQueue 有两个构造器,如下:

// 无参构造器

public DelayQueue() {}


// 指定集合的构造器

public DelayQueue(Collection<? extends E> c) {

// 该方法最后是通过 add 方法实现的,后文进行分析

this.addAll(c);

}

成员变量

// 锁,用于保证线程安全

private final transient ReentrantLock lock = new ReentrantLock();


// 优先队列,实际存储元素的地方

private final PriorityQueue<E> q = new PriorityQueue<E>();


// 线程等待的标识

private Thread leader = null;


// 触发条件,表示是否可以从队列中读取元素

private final Condition available = lock.newCondition();

关于优先队列可参考前文「 JDK源码分析-PriorityQueue 」的分析。

入队方法

DelayQueue 也是一个队列,它的入队方法有:add(E), offer(E), put(E) 等,它们的定义如下:

public boolean add(E e) {

return offer(e);

}


public void put(E e) {

offer(e);

}


public boolean offer(E e, long timeout, TimeUnit unit) {

return offer(e);

}

这几个方法都是通过 offer(E) 方法实现的,它的代码如下:

public boolean offer(E e) {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 入队

q.offer(e);

// 若该元素为队列头部元素,唤醒等待的线程

// (表示可以从队列中读取数据了)

if (q.peek() == e) {

leader = null;

available.signal();

}

return true;

} finally {

lock.unlock();

}

}

出队方法

有入队自然也有出队,主要方法有:poll(), take(), poll(timeout, unit), 如下:

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 获取队列头部元素

E first = q.peek();

// 头部元素为空,或者延时未到,则返回空

if (first == null || first.getDelay(NANOSECONDS) > 0)

return null;

// 否则返回头部元素

else

return q.poll();

} finally {

lock.unlock();

}

}

poll 方法是非阻塞的,即调用之后无论元素是否存在都会立即返回。下面看下阻塞的 take 方法:

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

// 以可中断方式获取锁

lock.lockInterruptibly();

try {

// 无限循环

for (;;) {

// 获取队列头部元素

E first = q.peek();

// 若为空,则等待

if (first == null)

available.await();

// 若不为空

else {

// 获取延迟的纳秒数,若小于等于零(即过期),则获取并删除头部元素

long delay = first.getDelay(NANOSECONDS);

if (delay <= 0)

return q.poll();

// 执行到这里,表示 delay>0,也就是延时未过期

first = null; // don't retain ref while waiting

// leader 不为空表示有其他线程在读取数据,当前线程等待

if (leader != null)

available.await();

else {

// 将当前线程设置为 leader

Thread thisThread = Thread.currentThread();

leader = thisThread;

try {

// 等待延迟时间过期

available.awaitNanos(delay);

} finally {

if (leader == thisThread)

leader = null;

}

}

}

}

} finally {

// 唤醒该条件下的其他线程

if (leader == null && q.peek() != null)

available.signal();

lock.unlock();

}

}

该方法看起来稍复杂,主要逻辑如下:

1. 获取队列头部元素;

1.1 若该元素为空(队列为空),则当前线程等待;

1.2 若该元素不为空,且已经过期,则取出该元素(并移除);

1.2.1 若未过期,且有其他线程在操作(leader 不为空),当前线程等待;

1.2.2 若未过期,且没有其他线程操作,则占有“操作权”(将 leader 设置为当前线程),并等待延迟过期。

以上操作循环执行。

take 方法是阻塞操作,当条件不满足时会一直等待。另一个  poll(timeout, unit) 方法和它有些类似,只不过带有延时,如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

long nanos = unit.toNanos(timeout);

final ReentrantLock lock = this.lock;

// 以可中断方式获取锁

lock.lockInterruptibly();

try {

// 无限循环

for (;;) {

// 获取队列的头部元素

E first = q.peek();

// 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间;

// 否则(即超时时间小于等于零)返回空

if (first == null) {

if (nanos <= 0)

return null;

else

nanos = available.awaitNanos(nanos);

} else {

// 执行到这里表示队列头部元素不为空

// 获取剩余延时

long delay = first.getDelay(NANOSECONDS);

// 延时已过期,返回队列头部元素

if (delay <= 0)

return q.poll();

// 延时未过期且等待超时,返回空

if (nanos <= 0)

return null;

first = null; // don't retain ref while waiting

// 延时未过期且等待未超时,且等待超时<延迟时间

// 表示有其他线程在取数据,则当前线程进入等待

if (nanos < delay || leader != null)

nanos = available.awaitNanos(nanos);

else {

// 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作

Thread thisThread = Thread.currentThread();

leader = thisThread;

try {

long timeLeft = available.awaitNanos(delay);

// 计算剩余延迟时间

nanos -= delay - timeLeft;

} finally {

// 该线程操作完毕,把 leader 置空

if (leader == thisThread)

leader = null;

}

}

}

}

} finally {

// 唤醒 available 条件下的一个其他线程

if (leader == null && q.peek() != null)

available.signal();

lock.unlock();

}

}

take 和 poll 方法还有一个区别: 当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空。

此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不同的是,peek 方法只是读取队列头部元素,并不会将其删除:

public E peek() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 返回队列的头部元素(不删除)

return q.peek();

} finally {

lock.unlock();

}

}

以上就是 DelayQueue 的主要方法的代码分析,为便于理解,下面简要举例分析。

用法举例

示例代码:

自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到 一个延迟队列中,代码如下:

public class TestDelayedQueue {

public static void main(String[] args) throws Exception {

BlockingQueue<Task> delayQueue = new DelayQueue<>();

long now = System.currentTimeMillis();

delayQueue.put(new Task("c", now + 6000));

delayQueue.put(new Task("d", now + 10000));

delayQueue.put(new Task("a", now + 3000));

delayQueue.put(new Task("b", now + 4000));

while (true) {

System.out.println(delayQueue.take());

TimeUnit.SECONDS.sleep(1);

}

}


private static class Task implements Delayed {

private String taskName;

private long endTime;


public Task(String taskName, long endTime) {

this.taskName = taskName;

this.endTime = endTime;

}


@Override

public long getDelay(TimeUnit unit) {

return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

}


@Override

public int compareTo(Delayed o) {

return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));

}


@Override

public String toString() {

return "taskName-->" + taskName;

}

}

}

结果会以延迟时间的顺序取出各个元素。

小结

1. DelayQueue 是一种队列,同时实现了 BlockingQueue 接口;

2. 它内部的元素有延迟时间的概念,出队时,若延时未到,则无法读取到队列头部的元素;

3. 它是线程安全的。

相关阅读:

JDK源码分析-PriorityQueue

JDK源码分析-BlockingQueue

JDK 源码分析:DelayQueue

原文  https://mp.weixin.qq.com/s/dd26D0ebUWgOSOd9GRsbKQ
正文到此结束
Loading...