上一篇我们手写了一个阻塞队列,今天我们接着开始学习之旅,让我们一起来看看ArrayBlockingQueue的源码吧。ArrayBlockingQueue是JDK中提供的工业级的通过数组实现的阻塞队列。
在这里插入图片描述
如上类图,ArrayBlockingQueue类继承了AbstractQueue抽象类,实现了BlockingQueue接口。那么我们先来看看BlockingQueue接口中定义了哪些方法。
public interface BlockingQueue<E> extends Queue<E> {
/**
* 插入数据到队列尾部(如果立即可行且不会超过该队列的容量)
* 在成功时返回 true,如果此队列已满,则抛IllegalStateException。(与offer方法的区别)
*/
boolean add(E e);
/**
* 插入数据到队列尾部,如果没有空间,直接返回false;
* 有空间直接插入,返回true。
*/
boolean offer(E e);
/**
* 插入数据到队列尾部,如果队列没有空间,一直阻塞;
* 有空间直接插入。
*/
void put(E e) throws InterruptedException;
/**
* 插入数据到队列尾部,如果没有额外的空间,等待一定的时间,有空间即插入,返回true,
* 到时间了,还是没有额外空间,返回false。
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 取出和删除队列中的头元素,如果没有数据,会一直阻塞到有数据
*/
E take() throws InterruptedException;
/**
* 取出和删除队列中的头元素,如果没有数据,需要会阻塞一定的时间,过期了还没有数据,返回null
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//除了上述方法还有继承自Queue接口的方法
/**
* 取出和删除队列头元素,如果是空队列直接返回null。
*/
E poll();
/**
* 取出但不删除头元素,该方法与peek方法的区别是当队列为空时会抛出NoSuchElementException异常
*/
E element();
/**
* 取出但不删除头元素,空队列直接返回null
*/
E peek();
/**
* 返回队列总额外的空间
*/
int remainingCapacity();
/**
* 删除队列中存在的元素
*/
boolean remove(Object o);
/**
* 判断队列中是否存在当前元素
*/
boolean contains(Object o);
}
BlockingQueue接口定义的方法主要分为两大类,一类是插入元素的方法,一类是取出元素的方法。如下图所示:
在这里插入图片描述
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** 存储数据的数组 */
final Object[] items;
/** 获取数据的索引,用于下次 take, poll, peek or remove 等方法 */
int takeIndex;
/** 添加元素的索引, 用于下次 put, offer, or add 方法 */
int putIndex;
/** 队列元素的个数 */
int count;
/** 控制并发访问的锁 */
final ReentrantLock lock;
/** 非空条件对象,用于通知 take 方法中在等待获取数据的线程,队列中已有数据,可以执行获取操作 */
private final Condition notEmpty;
/** 未满条件对象,用于通知 put 方法中在等待添加数据的线程,队列未满,可以执行添加操作 */
private final Condition notFull;
/** 迭代器 */
transient Itrs itrs = null;
}
如上从定义的全局变量,我们可以看出ArrayBlockingQueue 中定义了一个数组items用于存储数据,用count来记录队列元素的个数,用ReentrantLock+Condition条件变量来实现阻塞功能,其实本质上就是生产者-消费者模型。了解完重要的全局变量之后,我们接着来看看具体的入队和出队方法。
首先我们来看看入队相关的方法,我们会分别介绍add(e)
,offer(e)
,put(e)
和offer(e,time,unit)
这四个方法。
首先,我们来看看add(e)
方法, add()
方法内部也是调用了offer()
方法。区别就是当插入元素失败之后会抛出IllegalStateException("Queue full")异常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
接着我们来看看offer(e)
方法。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
如上,offer()
方法是一个线程安全的方法,当队列中的元素个数count等于队列中数组的长度时,则返回false。否则调用enqueue()
方法进行入队操作,入队成功之后返回true。接下来,我们就来看看enqueue()
方法。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//将要插入的元素x保存到putIndex位置上
items[putIndex] = x;
//对putIndex进行自加,如果putIndex加一之后的值等于items.length则将putIndex置为0
if (++putIndex == items.length)
putIndex = 0;
//count进行加1
count++;
//唤醒notEmpty等待队列里的一个消费者线程
notEmpty.signal();
}
如上enqueue
方法是入队的核心方法,首先将要插入的元素x保存到putIndex位置上,然后,对putIndex进行自加,当自加之后的值等于items.length时则将putIndex置为0,为什么要怎么做呢?因为我们知道ArrayBlockingQueue是一个先进先出的队列,并且入队和出队操作都是线程安全的。入队都是从第一个位置开始依次入队。出队也是从第一个位置开始依次出队。当putIndex等于数组的长度,即putIndex到达了队尾时,只要这时候count的值不等于数组的长度,就说明前面有元素出队,所以,此时将putIndex的值赋值为0,并不会覆盖掉第一个位置上的元素。并且可以实现循环的入队和出队。插入一个元素之后,count加一,然后唤醒非空等待队列里的一个消费者线程。
入队的逻辑说完之后,我们接下来看看出队的逻辑。
前面提到的那几个入队方法都是非阻塞的方法,接下来我们看看阻塞的入队方法put()
方法,当队满时,生产者线程会进入 未满(notFull)等待队列中进行等待,并释放持有的锁对象。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
我们还是会依次分析出队的相关方法,分别从 remove()
,poll()
,take()
和poll(time,unit)
还有take()
方法。这几个方法开始分析。
首先,我们来看看remove()
方法。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
同样的remove()
方法内部也是调用了poll()
方法,只是当获取不到元素之后就会抛出NoSuchElementException()
异常。
那么我们接着来看看poll()
方法
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
还是同样的poll()
方法也是一个线程安全的方法,如果count==0
的话则返回null,否则调用dequeue()
方法进行出队。那么我们接着看卡dequeue()
方法。
private E dequeue() {
// assert items[takeIndex] != null;
final Object[] items = this.items;
//获取takeIndex位置上的元素
E x = (E) items[takeIndex];
//将takeIndex位置上的元素清空
items[takeIndex] = null;
//takeIndex加1,加一之后的takeIndex如果等于items.length,则将takeIndex的值置为0
if (++takeIndex == items.length)
takeIndex = 0;
//count减1
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒未满等待队列里的一个生产者线程。
notFull.signal();
return x;
}
如上dequeue()
方法,还是首先获取takeIndex位置上的元素,然后对takeIndex加一,接着就是count减1,最后就是唤醒一个在未满等待队列里的生产者线程。
take方法是阻塞的出队方法,同样的,当count==0
即队列为空时,消费者线程会进入非空等待队列里进行等待。如果队列不为空才会调用dequeue()
方法进行出队。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
本文对ArrayBlockingQueue的源码进行了剖析,主要介绍了入队和出队的相关方法。并且对其实现阻塞功能的原理进行了详细的解释。
BlockingQueue-ArrayBlockingQueue 详细源码解析