转载

聊一聊 JUC 下的 ArrayBlockingQueue 队列

聊一聊 JUC 下的 ArrayBlockingQueue 队列

互联网平头哥第  112  篇原创文章 

ArrayBlockingQueue 是 JUC 下的另一个阻塞队列,按照惯例,我们还是先说一说 ArrayBlockingQueue 的特点,从全局上对 ArrayBlockingQueue  有了解。ArrayBlockingQueue 有以下几个特点:

  • 由数组实现的有界阻塞队列,容量一旦创建,后续大小无法修改;

  • 遵照先进先出规则,队头拿数据,队尾取数据;

  • 跟 LinkedBlockingQueue 一样,队列满时,往队列中 put 数据会被阻塞,队列空时,往队列中拿数据会被阻塞;

  • 对数据操作时,共用一把锁,所以不能同时读写操作;

ArrayBlockingQueue 跟 LinkedBlockingQueue 一样,同样继承了 AbstractQueue 实现 BlockingQueue 接口,所以在方法上跟 LinkedBlockingQueue 一样,所以在这里我们不把方法列出来了,可以去查看前面 LinkedBlockingQueue 的文章~

除了方法之外,在 ArrayBlockingQueue 中,还有两个比较重要的参数:

/** items index for next take, poll, peek or remove */
// 获取元素的位置
int takeIndex;
/** items index for next put, offer, or add */
// 新增元素时的数组下标
int putIndex;

由于 ArrayBlockingQueue 底层采用的是数组,结合上面的两个参数,ArrayBlockingQueue 的整体结构图大概如下:

聊一聊 JUC 下的 ArrayBlockingQueue 队列
ArrayBlockingQueue 的整体结构

ArrayBlockingQueue 有三个构造函数:


public ArrayBlockingQueue(int capacity);
// fair 表示是否为公平锁
public ArrayBlockingQueue(int capacity, boolean fair);

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c);

关于构造函数就不多说了,都大同小异,跟 LinkedBlockingQueue 一样,同样拿 put()take() 方法,看看 ArrayBlockingQueue 是如何实现数据的添加和拿取的~

先从 put() 方法开始,看看 ArrayBlockingQueue 是如何实现的~

public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
// 获取锁
final ReentrantLock lock = this.lock;
// 设置可重入锁
lock.lockInterruptibly();
try {
// 当数组队列存满时,阻塞等待.....
while (count == items.length)
notFull.await();
// 入队操作
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}
// 入队
private void enqueue(E e) {
final Object[] items = this.items;
// 根据 putIndex 插入到对应的位置即可
items[putIndex] = e;
// 设置好下一次插入的位置,如果当前插入的位置是最后一个元素,
// 那么下一次插入的位置就是队头了
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}

put() 方法的实现并不复杂,代码也就 20 行左右,我们来拆解一下 put 过程:

  • 1、先获取锁,对操作进行加锁;

  • 2、判断队列是否队满,如果队满,则挂起等待;

  • 3、根据 putIndex 的值,直接将元素插入到 items 数组中;

  • 4、调整 putIndex 的位置,用于下一次插入使用,如果当前 putIndex 是数组的最后一个位置,则 putIndex 下一次插入的位置是数组的第一个位置,可以把它当作是循环;

  • 5、解锁;

put 方整体解决起来不难,跟 LinkedBlockingQueue 一样,其他添加方法这里就不介绍了,大同小异~

再来看看 ArrayBlockingQueue 是如何实现 take() 方法的,take() 方法主要源码如下:


public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 判断队列是否为空,为空的话挂起等待
while (count == 0)
notEmpty.await();
// 获取数据
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {

final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 根据 takeIndex 获取 items 中的元素
E e = (E) items[takeIndex];
// 将 takeIndex 中的数据置为空
items[takeIndex] = null;
// 设置下一次获取数据的下标,
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}

take() 方法跟 put() 方法没有什么太大的区别,就是一个反操作~

最后我们再来关注一下 remove() 方法,这个方法还是有一些学问的,主要源码如下:


public boolean remove(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final Object[] items = this.items;
for (int i = takeIndex, end = putIndex,
to = (i < end) ? end : items.length;
; i = 0, to = end) {
// 遍历有值的一段数据
for (; i < to; i++)
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (to == end) break;
}
}
return false;
} finally {
lock.unlock();
}
}
// 主要看这个方法
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 如果要删除的位置正好是下一次 take的位置
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// 如果删除的位置时 takeIndex 和 putIndex 之间的位置,则被删除的数据全部往前移动~
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}

remove 的时候分三种情况:

  • 第一种: 当 removeIndex == takeIndex 时,这种情况就比较简单,将该位置的元素删除后,takeIndex +1 即可

  • 第二种: 当 removeIndex + 1 == putIndex 时,直接将 putIndex -1 就好,相当于 putIndex 指针往前移动一格

  • 第三种: 当 removeIndex != takeIndex &&  removeIndex + 1 != putIndex 时,这种情况就比较复杂了,需要涉及到数据的移动,要将 removeIndex 后面的数据全部往前移动一个位置,putIndex 的位置也要迁移一位,具体的可以参考源码

最后,以上就是对 ArrayBlockingQueue 的部分源码解析,希望对你有所帮助,更加详细的源码还需要你查看 ArrayBlockingQueue 的源码,感谢你的阅读~

还可以读

聊一聊 JUC 下的 LinkedBlockingQueue

聊一聊 JUC 下的 CopyOnWriteArrayList

原文  http://mp.weixin.qq.com/s?__biz=MzIyNTM4ODI0OA==&mid=2247484462&idx=1&sn=a7c21dbdde13ef51ac82f4f5ef2063e7
正文到此结束
Loading...