# 一、 前言

PriorityBlockingQueue是带优先级的无界阻塞队列，每次出队都返回优先级最高的元素，是二叉树最小堆的实现，研究过数组方式存放最小堆节点的都知道，直接遍历队列元素是无序的。

# 二、 PriorityBlockingQueue类图结构

image.png

private static final int DEFAULT_INITIAL_CAPACITY = 11;

public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}

public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

# 三、 offer操作

public boolean offer(E e) {

if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;

//如果当前元素个数>=队列容量，则扩容(1)
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);

try {
Comparator<? super E> cmp = comparator;

//默认比较器为null
if (cmp == null)(2)
siftUpComparable(n, e, array);
else
//自定义比较器(3)
siftUpUsingComparator(n, e, array, cmp);

//队列元素增加1，并且激活notEmpty的条件队列里面的一个阻塞线程
size = n + 1;（9）
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}

private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); //must release and then re-acquire main lock
Object[] newArray = null;

//cas成功则扩容(4)
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//oldGap<64则扩容新增oldcap+2,否者扩容50%，并且最大为MAX_ARRAY_SIZE
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}

//第一个线程cas成功后，第二个线程会进入这个地方，然后第二个线程让出cpu，尽量让第一个线程执行下面点获取锁，但是这得不到肯定的保证。(5)
if (newArray == null) // back off if another thread is allocating
lock.lock();(6)
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

tryGrow目的是扩容，这里要思考下为啥在扩容前要先释放锁，然后使用cas控制只有一个线程可以扩容成功。我的理解是为了性能，因为扩容时候是需要花时间的，如果这些操作时候还占用锁那么其他线程在这个时候是不能进行出队操作的，也不能进行入队操作，这大大降低了并发性。

private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;

//队列元素个数>0则判断插入位置，否者直接入队(7)
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;(8)
}

• 第一次offer(2)时候

image.png

image.png

• 第二次offer(4)时候 执行（1)为false，所以执行（2）由于k=1,所以进入while循环，parent=0;e=2;key=4;key>e所以break;然后把4存到数据下标为1的地方，这时候队列状态为：

image.png

• 第三次offer(6)时候 执行（1)为true,所以调用tryGrow,由于2<64所以newCap=2 + (2+2)=6;然后创建新数组并拷贝，然后调用siftUpComparable；k=2>0进入循环 parent=0;e=2;key=6;key>e所以break;然后把6放入下标为2的地方，现在队列状态：

image.png

• 第四次offer(1)时候 执行（1)为false，所以执行（2）由于k=3,所以进入while循环，parent=0;e=2;key=1; key<e；所以把2复制到数组下标为3的地方，然后k=0退出循环；然后把2存放到下标为0地方，现在状态：

image.png

# 四、 poll操作

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}

private E dequeue() {

//队列为空，则返回null
int n = size - 1;
if (n < 0)
return null;
else {

//获取队头元素(1)
Object[] array = queue;
E result = (E) array[0];

//获取对尾元素，并值null(2)
E x = (E) array[n];
array[n] = null;

Comparator<? super E> cmp = comparator;
if (cmp == null)//cmp=null则调用这个，把对尾元素位置插入到0位置，并且调整堆为最小堆(3)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;（4）
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1;           // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];（5）
int right = child + 1;（6)
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
c = array[child = right];
if (key.compareTo((T) c) <= 0)(8)
break;
array[k] = c;
k = child;
}
array[k] = key;(9)
}
}

• 第一次调用poll() 首先执行（1） result=1；然后执行（2）x=2;这时候队列状态

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

# 五、 put操作

public void put(E e) {
offer(e); // never need to block
}

# 六、 take操作

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {

//如果队列为空，则阻塞，把当前线程放入notEmpty的条件队列
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

# 七、 size操作

public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}

# 九、总结

PriorityBlockingQueue类似于ArrayBlockingQueue内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队，另外前者只使用了一个notEmpty条件变量而没有notFull这是因为前者是无界队列，当put时候永远不会处于await所以也不需要被唤醒。

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素，并且可以定制优先级的规则，内部通过使用一个二叉树最小堆算法来维护内部数组，这个数组是可扩容的，当当前元素个数>=最大容量时候会通过算法扩容。

172 篇文章34 人订阅

0 条评论

## 相关文章

2285

### redis-哈希表自动扩容

@(架构说)[redis] 为了回答上次遗留问题 哈希表如何扩容问题? 重点内容： 1 注释代码：最新版本 https://github.com...

5513

43311

3625

2729

### Kafka 新版消费者 API（三）：以时间戳查询消息和消费速度控制

kafka 在 0.10.1.1 版本增加了时间索引文件，因此我们可以根据时间戳来访问消息。 如以下需求：从半个小时之前的offset处开始消费消息，代码示例...

2.3K2

2042

2186

### 简单笔记

1、类的表面类型和实际类型 实例对象有两个类型：表面类型（Apparent Type）和实际类型（ActualType），表面类型是声明时的类型，实际类型是对象...

2615

953