转载

JDK 源码分析:ArrayBlockingQueue

概述

前文「 JDK源码分析-BlockingQueue 」简要分析了 BlockingQueue 接口的主要方法,ArrayBlockingQueue 就是该接口的一个主要实现类,本文分析该类的常用方法实现。

ArrayBlockingQueue 的类继承结构如下:

JDK 源码分析:ArrayBlockingQueue

从  ArrayBlockingQueue 的名字大概可以猜出来,它的内部是由数组实现的,下面分析其代码实现。

代码分析

构造器

构造器 1:


 

// 构造器 1:初始化 ArrayBlockingQueue 对象,使用给定的容量

public ArrayBlockingQueue(int capacity) {

// 调用构造器 2 进行初始化,默认使用非公平锁

this(capacity, false);

}

构造器 2:


 

// 构造器 2:使用给定容量及是否公平初始化 ArrayBlockingQueue 对象

public ArrayBlockingQueue(int capacity, boolean fair) {

if (capacity <= 0)

throw new IllegalArgumentException();

// 用给定的容量初始化内部数组

this.items = new Object[capacity];

// 创建锁对象(根据 fair 参数确定是否公平锁)

lock = new ReentrantLock(fair);

// lock 绑定两个 Condition 条件

notEmpty = lock.newCondition();

notFull = lock.newCondition();

}

构造器 3:


 

// 构造器 3:使用给定的容量、是否公平,及给定的集合初始化 ArrayBlockingQueue

public ArrayBlockingQueue(int capacity, boolean fair,

Collection<? extends E> c) {

// 使用构造器 2 初始化 ArrayBlockingQueue 对象

this(capacity, fair);

final ReentrantLock lock = this.lock;

lock.lock(); // Lock only for visibility, not mutual exclusion

try {

int i = 0;

try {

// 遍历给定集合的元素,将其插入数组

for (E e : c) {

checkNotNull(e);

items[i++] = e;

}

// 注意可能会发生数组越界

} catch (ArrayIndexOutOfBoundsException ex) {

throw new IllegalArgumentException();

}

// 数组中元素的数量

count = i;

// 入队操作(put、offer 等方法)的数组下标,若数组已满则为 0

putIndex = (i == capacity) ? 0 : i;

} finally {

// 注意释放锁

lock.unlock();

}

}

主要成员变量


 

/** The queued items */

// 内部保存元素的数组

final Object[] items;


/** items index for next take, poll, peek or remove */

// 出队操作索引

int takeIndex;


/** items index for next put, offer, or add */

// 入队操作索引

int putIndex;


/** Number of elements in the queue */

int count;


/*

* Concurrency control uses the classic two-condition algorithm

* found in any textbook.

* 双条件(notEmpty、notFull)算法用于并发控制

*/

/** Main lock guarding all access */

// 使用 ReentrantLock 保证线程安全

final ReentrantLock lock;


/** Condition for waiting takes */

// 等待 take 操作(消费)的条件

private final Condition notEmpty;


/** Condition for waiting puts */

// 等待 put 操作(生产)的条件

private final Condition notFull;

主要入队方法 add(E), offer(E), offer(E, timeout, Unit), put(E)

1. add(E) 方法


 

public boolean add(E e) {

// 调用父类 AbstractQueue 的 add 方法

return super.add(e);

}


// AbstractQueue 的 add 方法

public boolean add(E e) {

if (offer(e))

return true;

else

throw new IllegalStateException("Queue full");

}

add(E) 方法调用了父类 AbstractQueue 的 add(E) 方法,可以看到,实际上还是调用了 offer(E) 方法。因此 add(E) 和 offer(E) 实现基本是一致的,下面分析 offer(E) 方法。

2. offer(E),  offer(E, timeout, Unit ) 方法


 

public boolean offer(E e) {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 若队列已满,立即返回 false

if (count == items.length)

return false;

else {

// 入队

enqueue(e);

return true;

}

} finally {

lock.unlock();

}

}

enqueue 方法:


 

// 入队操作

private void enqueue(E x) {

// assert lock.getHoldCount() == 1;

// assert items[putIndex] == null;

final Object[] items = this.items;

items[putIndex] = x;

// 若队列已满,则下标置为 0

if (++putIndex == items.length)

putIndex = 0;

count++;

// 唤醒 notEmpty 条件下等待的线程

notEmpty.signal();

}

offer(E) 方法是将一个元素入队:若队列已满直接返回 false,否则执行入队操作,并唤醒 notEmpty 条件下等待的线程。

以“生产者-消费者”模型类比,执行 offer(E) 操作后表示队列已经有产品了(不为空,即 notEmpty),消费者可以消费了。

offer(E, timeout, Unit ) 方法操作与 offer(E) 类似,只是多了超时等待,如下:


 

public boolean offer(E e, long timeout, TimeUnit unit)

throws InterruptedException {

checkNotNull(e);

long nanos = unit.toNanos(timeout);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length) {

if (nanos <= 0)

return false;

nanos = notFull.awaitNanos(nanos);

}

enqueue(e);

return true;

} finally {

lock.unlock();

}

}

3. put(E) 方法


 

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 队列满的时候,notFull 条件等待

while (count == items.length)

notFull.await();

// 入队

enqueue(e);

} finally {

lock.unlock();

}

}

put(E) 也是将一个元素入队:若队列已满,则 notFull 条件下的线程等待。

以“生产者-消费者”模型类比,就是容器已满,生产者等待;否则执行入队,并唤醒消费者。

入队方法小结

1. add(E): 入队成功返回 true,否则抛出  IllegalStateException 异常;

2. offer(E) : 入队成功返回 true,失败返回 false;

3.  offer(E, timeout, Unit ): 同 offer(E),加了超时等待;

4. put(E): 无返回值,队列满的时候等待。

主要出队方法 :poll(), poll(long, unit), take(), peek()

1. poll() 方法


 

public E poll() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

// 队列为空时返回 null,否则将 takeIndex 位置元素出队

return (count == 0) ? null : dequeue();

} finally {

lock.unlock();

}

}

2. take(E) 方法


 

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

// 队列为空时等待

while (count == 0)

notEmpty.await();

return dequeue();

} finally {

lock.unlock();

}

}

3. poll(E, unit) 方法


 

public E poll(long timeout, TimeUnit unit) throws InterruptedException {

long nanos = unit.toNanos(timeout);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == 0) {

if (nanos <= 0)

return null;

nanos = notEmpty.awaitNanos(nanos);

}

return dequeue();

} finally {

lock.unlock();

}

}

与 poll() 方法操作类似,只是多了超时等待。

上述三个方法都使用 dequeue 方法进行出队,如下:


 

// 出队操作

private E dequeue() {

// assert lock.getHoldCount() == 1;

// assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

// 获取 takeIndex 位置的元素

E x = (E) items[takeIndex];

// 将 该位置清空

items[takeIndex] = null;

// 队列已经空了

if (++takeIndex == items.length)

takeIndex = 0;

count--;

// 迭代器操作用到,本文暂不深入分析

if (itrs != null)

itrs.elementDequeued();

// 队列已经不满(not full)了,可以继续生产

notFull.signal();

return x;

}

4. peek() 方法


 

public E peek() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return itemAt(takeIndex); // null when queue is empty

} finally {

lock.unlock();

}

}


// 返回数组中指定位置的元素

final E itemAt(int i) {

return (E) items[i];

}

peek() 方法与前面几个出队操作不同,peek 方法只会获取队列的头元素,而不会将其删除。

出队方法小结

1. poll(): 获取队列头部元素,并将其移除,队列为空时返回 null;

2. take():  获取 队列头部 元素,并将 其移除 ,队列为空时 阻塞等待;

3.  poll(long, unit): 获取队列头部元素,并将其移除 ,队列为空时等待一段时间,若超时返回 null;

4. peek():  获取队列头部元素,但不移除该元素。

小结

1. ArrayBlockingQueue 是基于数组的阻塞队列实现,它在初始化时需要指定容量;

2. 内部使用了 ReentrantLock 保证线程安全;

3. 常用方法:

入队:add, offer, put

出队:poll, take, peek

本文分析了其常用的方法,此外,还有一些方法使用频率没那么高且稍微复杂,例如 iterator() 和 drainTo(),后文再进行分析。

相关阅读:

JDK源码分析-BlockingQueue

JDK源码分析-ReentrantLock

Stay hungry, stay foolish.

JDK 源码分析:ArrayBlockingQueue

原文  https://mp.weixin.qq.com/s/xvG7RIiafhZOP4NQpfHw0Q
正文到此结束
Loading...