从类名可知,LinkedBlockingQueue是基于链表实现的阻塞队列。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
类特点:
Collection
和 Iterator
接口的可选方法;
这是 two lock queue
算法的变体。putLock守卫元素的put、offer操作,且与等待存入的条件关联。takeLock原理类似。putLock和takeLock都依赖的 count
变量,被维护为一个原子变量,以避免多数情况下需同时请求两个锁。
为了最小化put时需获取takeLock,使用了层叠式通知。当put操作注意到至少一个take可以启动,就会通知获取者。如果有多个item在信号后进队,获取者将依次通知其他获取者。因此,有对称的取操作通知存操作。有些操作如remove和iterators会同时请求两个锁。
读取者和写入者间可见性如下提供:
当元素已进入队列,putLock已被获取,且count变量也更新。随后,读取者通过获取putLock或获取takeLock,得到入队元素的可见性,然后读取 n = count.get();
,令前n个元素变得可见。
为实现弱一致性迭代器,显然要从前导出队节点上保持所有节点的GC可达性。这会引起两个问题:
然而,只有未删除节点需要从已出队节点可达,GC不需要理解可达性的类型。我们使用一些小手段连接一个刚刚被出队的节点。
源码来自JDK10。
单向链表节点类,查找元素需要从链表头开始依次遍历节点
static class Node<E> {
E item; // 节点数据
Node<E> next; // 下一节点
Node(E x) { item = x; } // 构造方法,存入节点的内容
}
队列容量,默认为Integer.MAX_VALUE
private final int capacity;
已存元素总数
private final AtomicInteger count = new AtomicInteger();
链表头节点
transient Node<E> head;
链表尾节点
private transient Node<E> last;
此锁被take,poll等操作持有
private final ReentrantLock takeLock = new ReentrantLock();
takes的等待队列
private final Condition notEmpty = takeLock.newCondition();
此锁被put,offer等持有
private final ReentrantLock putLock = new ReentrantLock();
puts的等待队列
private final Condition notFull = putLock.newCondition();
唤醒notEmpty队列上正在等待获取元素的线程,此方法由put/offer调用。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
唤醒notFull队列上正在等待存入元素的线程,此方法由take/poll调用。
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
元素队尾进队
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
队头元素出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 帮助GC
head = first;
E x = first.item;
first.item = null;
return x;
}
上锁禁止存取操作
void fullyLock() {
putLock.lock();
takeLock.lock();
}
解锁允许存取操作
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
默认构造方法,最大容量为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
构造方法,初始化头结点引用和为节点引用,使用指定capacity
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
// 自定义队列capacity
this.capacity = capacity;
// 头指针和尾指针指向同一个空节点
last = head = new Node<E>(null);
}
通过集合实例初始化类,最大容量为Integer.MAX_VALUE
public LinkedBlockingQueue(Collection<? extends E> c) {
// capacity设置为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
// 没有竞争,但对可见性来说有必要
putLock.lock();
try {
int n = 0;
// 依次遍历集合c
for (E e : c) {
if (e == null)
// 集合元素为空抛出NullPointerException
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
// 在集合c中取元素,用元素创建节点存入队列
enqueue(new Node<E>(e));
// 添加元素递增
++n;
}
// 最后把总添加元素更新至原子值count
count.set(n);
} finally {
putLock.unlock();
}
}
获取队已存元素数量
public int size() {
return count.get();
}
剩余可用队列容量
public int remainingCapacity() {
return capacity - count.get();
}
插入到队列尾部,直到成功插入才返回成功
public void put(E e) throws InterruptedException {
// 存入元素不能为空
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 此锁可以被中断
putLock.lockInterruptibly();
try {
// 队列已满则原地等待直到队列出现空余
while (count.get() == capacity) {
notFull.await();
}
// 元素插入到队列尾部
enqueue(node);
// 已保存元素数量递增
c = count.getAndIncrement();
// 检查队列是否已满
if (c + 1 < capacity)
// 通知其他线程继续插入元素
notFull.signal();
} finally {
// 解除锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
在队列尾部插入指定元素,如果在指定超时时间内插入成功返回true,否则返回false
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// e为空抛出NullPointerException
if (e == null) throw new NullPointerException();
// 根据超时值和时间单位转换为纳秒时长
long nanos = unit.toNanos(timeout);
int c = -1;
// 获取putLock
final ReentrantLock putLock = this.putLock;
// 获取已有元素总数
final AtomicInteger count = this.count;
// 锁putLock设置为可中断
putLock.lockInterruptibly();
try {
// 队列已满则原地等待直到队列出现空余
while (count.get() == capacity) {
if (nanos <= 0L)
// 到达超时时间,退出方法并返回false
return false;
// 时间倒计时
nanos = notFull.awaitNanos(nanos);
}
// 元素进入队列
enqueue(new Node<E>(e));
// 队列元素总数递增
c = count.getAndIncrement();
if (c + 1 < capacity)
// 唤醒其他写入线程
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
把指定元素插入到队尾,如果插入成功返回true,队列已满插入失败返回false。当使用有容量限制的队列时,这个方法是比add方法更好,因为add元素添加失败会抛出异常。存入元素e为空时抛出NullPointerException。
public boolean offer(E e) {
// 存入元素不能为null
if (e == null) throw new NullPointerException();
// 获取队列元素总数
final AtomicInteger count = this.count;
// 队列已满,退出
if (count.get() == capacity)
return false;
int c = -1;
// 创建新节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 获取不可中断锁
putLock.lock();
try {
// 还有剩余空间
if (count.get() < capacity) {
// 向队列存入元素
enqueue(node);
// 队列保存元素总数递增
c = count.getAndIncrement();
// 队列没满,通知其他线程写入
if (c + 1 < capacity)
notFull.signal();
}
} finally {
// 解锁
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
获取元素
public E take() throws InterruptedException {
E x;
int c = -1;
// 获取队列元素总数
final AtomicInteger count = this.count;
// 获取takeLock
final ReentrantLock takeLock = this.takeLock;
// 上锁,设置为可中断
takeLock.lockInterruptibly();
try {
// 如果队列为空,则等待其他线程通知
while (count.get() == 0) {
notEmpty.await();
}
// 队列有可用元素,队头元素出列
x = dequeue();
// 队列元素总数递减
c = count.getAndDecrement();
// 如果队列还有元素,通知其他线程取数据
if (c > 1)
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
在指定等待超时时间内获取元素,到达超时时间没有则返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
// 转换超时时间为纳秒
long nanos = unit.toNanos(timeout);
// 获取队列元素总数
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 上锁takeLock,设置为可中断
takeLock.lockInterruptibly();
try {
// 当队列为空,在超时时间内等待
while (count.get() == 0) {
if (nanos <= 0L)
// 到达超时时间,返回null
return null;
// 超时时间倒数
nanos = notEmpty.awaitNanos(nanos);
}
// 队头元素出列
x = dequeue();
// 队列元素总数递减
c = count.getAndDecrement();
// 队列还有可用元素,通知其他线程获取数据
if (c > 1)
notEmpty.signal();
} finally {
// 解锁
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
获取元素
public E poll() {
// 获取元素数量
final AtomicInteger count = this.count;
// 队列中没有元素则返回null
if (count.get() == 0)
return null;
// 队列中有元素,开始以下逻辑
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 元素从对头出队
x = dequeue();
// 队列元素数量递减
c = count.getAndDecrement();
// 队列还有可用元素,通知其他线程获取数据
if (c > 1)
notEmpty.signal();
}
} finally {
// 解锁
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
返回队列头节点包含的数据,此操作不会改变队列节点的数量或顺序
public E peek() {
// 队列没有节点直接返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 返回对头元素的内容,否则返回null
return (count.get() > 0) ? head.next.item : null;
} finally {
// 解锁
takeLock.unlock();
}
}
把节点p从列表中解除链接,pred是p的上一个节点
void unlink(Node<E> p, Node<E> pred) {
// assert putLock.isHeldByCurrentThread();
// assert takeLock.isHeldByCurrentThread();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null; // 节点p的内容置空
pred.next = p.next; // 操作p前后节点,令p解除链接
if (last == p) // 如果p是尾节点,则调整尾指针的指向
last = pred;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
如果队列中存在指定元素,则把该元素从队列中移除。即使队列中存在多个相同元素,此方法只会移除其中一个。移除成功返回true,否则返回false。
public boolean remove(Object o) {
// 元素为null直接返回false
if (o == null) return false;
fullyLock();
try {
for (Node<E> pred = head, p = pred.next;
p != null;
pred = p, p = p.next) {
// 在链表上逐个查找元素是否匹配
if (o.equals(p.item)) {
// 找到匹配元素则把该元素解除链接
unlink(p, pred);
// 元素返回true
return true;
}
}
// 没有移除元素
return false;
} finally {
fullyUnlock();
}
}
检查是否包含指定元素
public boolean contains(Object o) {
// 元素为null直接返回false
if (o == null) return false;
fullyLock();
try {
// 遍历队列逐个查找元素
for (Node<E> p = head.next; p != null; p = p.next)
// 找到匹配元素
if (o.equals(p.item))
return true;
// 找不到匹配元素
return false;
} finally {
fullyUnlock();
}
}
返回一个数组,此数组包含队列的所有元素,且数组元素的顺序和队列的元素的顺序一致。每次返回的数组为不同对象,修改数组是安全的。
public Object[] toArray() {
fullyLock();
try {
// 获取队列中元素个数
int size = count.get();
// 通过元素数量构造数组
Object[] a = new Object[size];
int k = 0;
// 遍历队列,一次拷贝元素引用到数组对应索引
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
// 返回数组
return a;
} finally {
fullyUnlock();
}
}
返回包含队列所有元素的数组,数组元素顺序和队列元素顺序一致。如果传入数组空间足够,返回的数组就是传入的数组(运行时类型也一致)。否则方法内部创建类型相同新数组,数组长度和队列元素数量相等。如果传入的数组保存队列所有元素后还有空余,这个空余会被置null。
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock();
try {
// 获取队列元素总数
int size = count.get();
// 队列元素总数超过传入数组的长度
if (a.length < size)
// 创建新的数组,长度为队列元素总数
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);
int k = 0;
// 依次把队列的元素存入数组中
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
// 如果数组还有空余空间,则该位置设为null
if (a.length > k)
a[k] = null;
// 返回数组
return a;
} finally {
fullyUnlock();
}
}
移除队列中所有元素,移除完成后队列元素为空
public void clear() {
// 上锁
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null; // 置空节点的数据
}
head = last; // 由于队列元素全清空了,所以头指针和为指针引用相同
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}
移除队列所有元素,并把这些元素添加到指定集合c中
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
移除队列最多maxElements个元素,并把这些元素添加到指定集合c中
public int drainTo(Collection<? super E> c, int maxElements) {
Objects.requireNonNull(c);
// 不能把本队列的元素添加到自己队列上
if (c == this)
throw new IllegalArgumentException();
// maxElements须为正数
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
// takeLock上锁
takeLock.lock();
try {
// 计算需要转移多少个队列元素
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
// 从列表取节点
Node<E> p = h.next;
// 节点数据添加到数组中
c.add(p.item);
// 置空节点的数据
p.item = null;
// 引用移动到下一个节点
h.next = h;
h = p;
// 转移节点数递增
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
// 已经转移了i个元素,队列的头引用需要向后移动i个位置
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
在任何元素没有完全上锁的情况下遍历。此种遍历必须处理以下两种情况:
Node<E> succ(Node<E> p) {
if (p == (p = p.next))
p = head.next;
return p;
}
返回队列当前元素顺序的迭代器,元素顺序按照队列头到队列尾
public Iterator<E> iterator() {
return new Itr();
}
弱一致性迭代器。懒更新祖先域提供预计O(1)的remove(),最差情况下为O(n),不管何时保存的祖先节点被并发删除。
private class Itr implements Iterator<E> {
private Node<E> next; // Node holding nextItem
private E nextItem; // next item to hand out
private Node<E> lastRet;
private Node<E> ancestor; // Helps unlink lastRet on remove()
Itr() {
fullyLock();
try {
if ((next = head.next) != null)
nextItem = next.item;
} finally {
fullyUnlock();
}
}
public boolean hasNext() {
return next != null;
}
public E next() {
Node<E> p;
if ((p = next) == null)
throw new NoSuchElementException();
lastRet = p;
E x = nextItem;
fullyLock();
try {
E e = null;
for (p = p.next; p != null && (e = p.item) == null; )
p = succ(p);
next = p;
nextItem = e;
} finally {
fullyUnlock();
}
return x;
}
public void forEachRemaining(Consumer<? super E> action) {
// A variant of forEachFrom
Objects.requireNonNull(action);
Node<E> p;
if ((p = next) == null) return;
lastRet = p;
next = null;
final int batchSize = 64;
Object[] es = null;
int n, len = 1;
do {
fullyLock();
try {
if (es == null) {
p = p.next;
for (Node<E> q = p; q != null; q = succ(q))
if (q.item != null && ++len == batchSize)
break;
es = new Object[len];
es[0] = nextItem;
nextItem = null;
n = 1;
} else
n = 0;
for (; p != null && n < len; p = succ(p))
if ((es[n] = p.item) != null) {
lastRet = p;
n++;
}
} finally {
fullyUnlock();
}
for (int i = 0; i < n; i++) {
@SuppressWarnings("unchecked") E e = (E) es[i];
action.accept(e);
}
} while (n > 0 && p != null);
}
public void remove() {
Node<E> p = lastRet;
if (p == null)
throw new IllegalStateException();
lastRet = null;
fullyLock();
try {
if (p.item != null) {
if (ancestor == null)
ancestor = head;
ancestor = findPred(p, ancestor);
unlink(p, ancestor);
}
} finally {
fullyUnlock();
}
}
}
Spliterators.IteratorSpliterator的自定义变体
private final class LBQSpliterator implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
long est = size(); // size estimate
LBQSpliterator() {}
public long estimateSize() { return est; }
public Spliterator<E> trySplit() {
Node<E> h;
if (!exhausted &&
((h = current) != null || (h = head.next) != null)
&& h.next != null) {
int n = batch = Math.min(batch + 1, MAX_BATCH);
Object[] a = new Object[n];
int i = 0;
Node<E> p = current;
fullyLock();
try {
if (p != null || (p = head.next) != null)
for (; p != null && i < n; p = succ(p))
if ((a[i] = p.item) != null)
i++;
} finally {
fullyUnlock();
}
if ((current = p) == null) {
est = 0L;
exhausted = true;
}
else if ((est -= i) < 0L)
est = 0L;
if (i > 0)
return Spliterators.spliterator
(a, 0, i, (Spliterator.ORDERED |
Spliterator.NONNULL |
Spliterator.CONCURRENT));
}
return null;
}
public boolean tryAdvance(Consumer<? super E> action) {
Objects.requireNonNull(action);
if (!exhausted) {
E e = null;
fullyLock();
try {
Node<E> p;
if ((p = current) != null || (p = head.next) != null)
do {
e = p.item;
p = succ(p);
} while (e == null && p != null);
if ((current = p) == null)
exhausted = true;
} finally {
fullyUnlock();
}
if (e != null) {
action.accept(e);
return true;
}
}
return false;
}
public void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
if (!exhausted) {
exhausted = true;
Node<E> p = current;
current = null;
forEachFrom(action, p);
}
}
public int characteristics() {
return (Spliterator.ORDERED |
Spliterator.NONNULL |
Spliterator.CONCURRENT);
}
}
返回基于此队列元素的可分割迭代器。返回的可分割迭代器是弱一致性的。此迭代器可报告Spliterator#CONCURRENT、Spliterator#ORDERED、Spliterator#NONNULL。
public Spliterator<E> spliterator() {
return new LBQSpliterator();
}
返回活节点p的前导节点,以便解链接p
Node<E> findPred(Node<E> p, Node<E> ancestor) {
// assert p.item != null;
if (ancestor.item == null)
ancestor = head;
// Fails with NPE if precondition not satisfied
for (Node<E> q; (q = ancestor.next) != p; )
ancestor = q;
return ancestor;
}
移除所有集合c的元素,若集合c对象为空则抛出NullPointerException
public boolean removeAll(Collection<?> c) {
Objects.requireNonNull(c);
return bulkRemove(e -> c.contains(e));
}
仅保留所有集合c的元素,若集合c对象为空则抛出NullPointerException
public boolean retainAll(Collection<?> c) {
Objects.requireNonNull(c);
return bulkRemove(e -> !c.contains(e));
}
实现批量移除方法
@SuppressWarnings("unchecked")
private boolean bulkRemove(Predicate<? super E> filter) {
boolean removed = false;
Node<E> p = null, ancestor = head;
Node<E>[] nodes = null;
int n, len = 0;
do {
// 1. Extract batch of up to 64 elements while holding the lock.
long deathRow = 0; // "bitset" of size 64
fullyLock();
try {
if (nodes == null) {
if (p == null) p = head.next;
for (Node<E> q = p; q != null; q = succ(q))
if (q.item != null && ++len == 64)
break;
nodes = (Node<E>[]) new Node<?>[len];
}
for (n = 0; p != null && n < len; p = succ(p))
nodes[n++] = p;
} finally {
fullyUnlock();
}
// 2. Run the filter on the elements while lock is free.
for (int i = 0; i < n; i++) {
final E e;
if ((e = nodes[i].item) != null && filter.test(e))
deathRow |= 1L << i;
}
// 3. Remove any filtered elements while holding the lock.
if (deathRow != 0) {
fullyLock();
try {
for (int i = 0; i < n; i++) {
final Node<E> q;
if ((deathRow & (1L << i)) != 0L
&& (q = nodes[i]).item != null) {
ancestor = findPred(q, ancestor);
unlink(q, ancestor);
removed = true;
}
}
} finally {
fullyUnlock();
}
}
} while (n > 0 && p != null);
return removed;
}
上一篇
压缩Jekyll的HTML代码