ArrayBlockingQueue是一个阻塞式的先进先出队列。该结构具有以下三个特点:
· 先进先出队列,队列头是最先进入队列的元素,队列尾是最后进队列的元素。
· 有界队列,初始化时需要指定的队列容量,就是该队列的最大容量,队列中容量达到最大值时不会扩容,则会阻塞队列。
· 队列不支持null元素,当往队列中放入null元素时会抛出异常。
接下来以源码剖析的方式来讲解ArrayBlockingQueue。
· ArrayBlockingQueue类
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
ArrayBlockingQueue类继承AbstractQueue,并实现BlockingQueue接口。
· ArrayBlockingQueue类属性
//队列中元素数组
final Object[] items;
//下一个读取或移除元素的位置
int takeIndex;
//下一个存放元素的位置
int putIndex;
//队列中元素数量
int count;
//可重入锁
final ReentrantLock lock;
//队列 读取元素条件
private final Condition notEmpty;
//队列 存放元素条件
private final Condition notFull;
//迭代器
transient Itrs itrs = null;
· ArrayBlockingQueue构造函数
//指定队列容量的构造函数,默认为非公平锁
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//指定队列容量、指定锁是否公平的构造函数
public ArrayBlockingQueue(int capacity, boolean fair) {
//容量不能小于等于0
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化队列(初始化数组,容量为指定的大小)
this.items = new Object[capacity];
//初始化锁
lock = new ReentrantLock(fair);
//初始化读取队列条件(队列只有有元素情况下才能读取元素 ,否则阻塞等待)
notEmpty = lock.newCondition();
//初始化存入队列条件 (队列只有元素容量小于数组容量才能存放元素,否则阻塞等待)
notFull = lock.newCondition();
}
//指定队列容量、指定锁是否公平、添加初始数组元素的构造函数
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//执行第二个构造函数
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当前数组下标
int i = 0;
try {
//遍历初始化时需要添加的元素
for (E e : c) {
//校验元素不为空 为空会抛出异常,被捕获
checkNotNull(e);
//存放元素到数组中
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//当前元素下标就为队列元素容量
count = i;
//下一个存放元素的下标
putIndex = (i == capacity) ? 0 : i;
} finally {
//解锁
lock.unlock();
}
}
ArrayBlockingQueue构造函数可指定初始化该阻塞队列的方式,其中容量大小必须得指定,并且大于0。还可以指定是否使用公平锁(默认为非公平锁)。并可以在初始化中放入需要放入的集合元素。
boolean fair为true时采用公平锁,线程获取锁的顺序会和线程调用lock获取锁的顺序一样,但是公平锁需要增加阻塞和唤醒的时间开销。
如果参数为false时,采用非公平锁,线程获取锁的顺序是随机获取,因此,可以根据对应的场景来选择是否采用公平锁,只有在特别需要他的时候再使用公平锁。
· ArrayBlockingQueue元素添加
public boolean add(E e) {
//调用父类AbstractQueue的add方法,实际最后会调用下面的offer方法
return super.add(e);
}
public boolean offer(E e) {
//校验不为空 为空 抛出异常
checkNotNull(e);
//获取锁
final ReentrantLock lock = this.lock;
//加锁处理
lock.lock();
try {
//当前队列数组元素超过最大容量 插入失败 返回false
if (count == items.length)
return false;
else {
//存入元素
enqueue(e);
return true;
}
} finally {
//解锁
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
//校验不为空 为空 抛出异常
checkNotNull(e);
//获取锁
final ReentrantLock lock = this.lock;
//获取支持响应中断的锁
lock.lockInterruptibly();
try {
//如果队列满 则调用notFull阻塞等待
(当队列元素被取出或移除时会调用 notFull的signal方法被唤起) while (count == items.length)
//阻塞
notFull.await();
//存入元素
enqueue(e);
} finally {
//解锁
lock.unlock();
}
}
//传入了等待结束时间
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//校验元素是否空 为空 抛出异常
checkNotNull(e);
//获取阻塞时间
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//数组容量判断
while (count == items.length) {
if (nanos <= 0)
//阻塞时间到了并且容量仍没减少 就失败 返回false
return false;
//根据上面阻塞时间 阻塞
nanos = notFull.awaitNanos(nanos);
}
//放入元素
enqueue(e);
return true;
} finally {
//解锁
lock.unlock();
}
}
private void enqueue(E x) {
//当前队列数组
final Object[] items = this.items;
//元素放入到数组中
items[putIndex] = x;
//计算下一个放入数组的下标
//如果元素容量已满,则重置下一个放置位置为0
if (++putIndex == items.length)
putIndex = 0;
//当前队列容量+1
count++;
//唤醒因元素为空而无法获取元素导致阻塞的线程
notEmpty.signal();
}
添加操作并不复杂,正好验证了上面说的数组容量满了的时候不会扩容的情况,并会造成阻塞。添加操作完成后,还会唤醒因元素为空无法获取元素而阻塞住的线程。另外放入元素后队列容量达到最大值时,会重置putIndex的位置为0。
· ArrayBlockingQueue元素获取
public E poll() {
//获取锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//队列中午元素返回null, 队列中有元素则移除队尾元素并返回
return (count == 0) ? null : dequeue();
} finally {
//解锁
lock.unlock();
}
}
public E take() throws InterruptedException {
//获取锁
final ReentrantLock lock = this.lock;
//获取响应中断的锁
lock.lockInterruptibly();
try {
//队列中无元素,则阻塞,直到队列中存入元素被唤醒。
while (count == 0)
notEmpty.await();
//获取队尾元素
return dequeue();
} finally {
//解锁
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//计算阻塞时间
long nanos = unit.toNanos(timeout);
//获取锁
final ReentrantLock lock = this.lock;
//获取响应中断的锁
lock.lockInterruptibly();
try {
//队列中无元素,则阻塞,阻塞时间为上面计算出来的时间。 阻塞时间过了,仍无元素就返回null
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//去除队尾元素
return dequeue();
} finally {
//解锁
lock.unlock();
}
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//根据takeIndex位置读取元素
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
//移除元素核心接口
private E dequeue() {
//当前数组
final Object[] items = this.items;
//根据当前查询下标获取当前元素
E x = (E) items[takeIndex];
//移除元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
//减少队列的容量
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒因队列满了导致无法读取元素而阻塞的线程
notFull.signal();
//返回当前元素
return x;
}
· ArrayBlockingQueue元素移除
//根据指定下标删除元素
void removeAt(final int removeIndex) {
//当前队列数组
final Object[] items = this.items;
//移除下标 和 需要移除的下标相同
if (removeIndex == takeIndex) {
//移除元素
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
//指定的下标如果和takeIndex不一致 会将有效的元素往前移动一位,
//获取下一个需要存放元素的位置
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
//如果需要删除的元素是队列中的最后一个元素, 那么将next置为0
if (next == items.length)
next = 0;
if (next != putIndex) {
//行成环形队列
items[i] = items[next];
i = next;
} else {
//清空元素
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
//唤醒因队列空间满而无法放入元素导致阻塞的线程
notFull.signal();
}
在这个元素中比较有意思的是用了notFull和NotEmpty监控用于线程的阻塞与唤醒。notFull.signal()可以唤醒因队列空间满而无法将元素放入数组导致阻塞的线程,notEmpty()可以唤醒因队列空间无数据而无法取出数组中的元素导致阻塞的线程。这些监控的开启和初始化需要与lock锁相绑定,他们的使用方法也与Object的wait()方法和notify()方法相似。
ArrayBlockingQueue中的读取元素时都会采用lock.lockInterruptibly方法获取一个支持响应中断的锁,允许发生阻塞时先释放锁资源,直到该线程被唤醒而重新获取锁。
由上面源码也可以看出ArrayBlockingQueue的读和写都是共享一个锁,因此读写同时发生时也会造成阻塞。