前面学习了基于数组的非阻塞双端队列
ArrayDeque,其内部维护一个数组和指向队列头和队列尾索引的两个成员变量;本篇则探究下基于数组的阻塞队列是什么样的数据结构,又有什么特性,相较于ArrayDeque又有什么异同;然后就是使用场景了
先看内部成员变量定义, 和 ArrayDequeue相比,差别不大,一个数组,两个索引;此外多了一个锁和两个判定条件
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;注意
count直接表示队列的元素个数(注意DelayQueue是通过遍历来获取队列长度,且并发修改会有问题,那么这个是如何保证并发的?)数据结构如下图

分析阻塞原理之前,先通过注释解释下ArrayBlockingQueue的使用场景
通用的进队方法如下,是非阻塞的方式,当数组满时,直接返回false,为保证并发安全,进队操作是加锁实现
public boolean offer(E e) {
// 非空校验
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 进队加锁
try {
if (count == items.length)
// 队列满,则直接返回false
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 直接将元素塞入数组
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}阻塞方式的进队实现如下
public void put(E e) throws InterruptedException {
checkNotNull(e); // 非空判断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取锁
try {
while (count == items.length) {
// 一直阻塞,知道队列非满时,被唤醒
notFull.await();
}
enqueue(e); // 进队
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
// 阻塞,知道队列不满
// 或者超时时间已过,返回false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}源码分析,阻塞入队的逻辑比较清晰,小结一下
offer(e)put(e)或 offer(e, timeout, unit)notEmpty.signal()唤起被阻塞的出队线程非阻塞出队方法如下
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
// 直接将队头扔出去,并置空数组中该位置
// 并移动队列头到下一位
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
// 保障在遍历时,可以进行出队操作
itrs.elementDequeued();
notFull.signal();
return x;
}阻塞的实现,逻辑比较清晰,首先竞争锁,判断是否为空,是阻塞直到非空;否则弹出队列头元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}小结
poll()方法take()或 poll(long,TimeUnit)方法创建线程池时,通常会用到 ArrayBlockingQueue或者LinkedBlockingQueue,如
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(2));延迟队列也是并发安全,ArrayBlockingQueue 相比较 DelayQueue应用场景的区别主要在
getDelay()返回值小于0)基于数组阻塞队列ArrayBlockingQueue
take() 或 poll(long, TimeUnit)poll()offer(E, long, TimeUnit)或put(E)offer(E) add(E)