前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 优先队列 PriorityQueue PriorityBlockingQueue 源码分析

Java 优先队列 PriorityQueue PriorityBlockingQueue 源码分析

作者头像
Yano_nankai
发布2019-02-25 16:25:47
9670
发布2019-02-25 16:25:47
举报
文章被收录于专栏:二进制文集二进制文集

基本使用

代码语言:javascript
复制
@Test
public void testPriorityQueue() throws InterruptedException {
    PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));
    System.out.println(priorityQueue);
    System.out.println(priorityQueue.poll());
    System.out.println(priorityQueue.poll());

    PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
    blockingQueue.add(5);
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.take());
}

输出

代码语言:javascript
复制
[1, 3, 2, 4, 5]
1
2
5
(阻塞)

PriorityQueue

成员变量

代码语言:javascript
复制
/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
transient Object[] queue; // non-private to simplify nested class access

/**
 * The number of elements in the priority queue.
 */
private int size = 0;

/**
 * The comparator, or null if priority queue uses elements'
 * natural ordering.
 */
private final Comparator<? super E> comparator;

/**
 * The number of times this priority queue has been
 * <i>structurally modified</i>.  See AbstractList for gory details.
 */
transient int modCount = 0; // non-private to simplify nested class access

通过数组实现一个堆,元素在queue数组中并不是完全有序的,仅堆顶元素最大或最小。

基本方法

代码语言:javascript
复制
public E poll() {
    if (size == 0)
        return null;
    int s = --size;
    modCount++;
    E result = (E) queue[0];
    E x = (E) queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    return result;
}

/**
 * Inserts item x at position k, maintaining heap invariant by
 * demoting x down the tree repeatedly until it is less than or
 * equal to its children or is a leaf.
 *
 * @param k the position to fill
 * @param x the item to insert
 */
private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}

@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;        // loop while a non-leaf
    while (k < half) {
        int child = (k << 1) + 1; // assume left child is least
        Object c = queue[child];
        int right = child + 1;
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = key;
}

以poll方法为例,实际上是获取堆顶元素,然后调整堆。

调整堆的方法(以大顶堆为例):

  1. 判断是否传入comparator,有则按照comparator排序,否则按照自然顺序排序
  2. 取节点左右孩子节点最大值,与父亲节点交换

扩容方法

代码语言:javascript
复制
/**
 * Increases the capacity of the array.
 *
 * @param minCapacity the desired minimum capacity
 */
private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // Double size if small; else grow by 50%
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    // overflow-conscious code
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);
}

private static int hugeCapacity(int minCapacity) {
    if (minCapacity < 0) // overflow
        throw new OutOfMemoryError();
    return (minCapacity > MAX_ARRAY_SIZE) ?
        Integer.MAX_VALUE :
        MAX_ARRAY_SIZE;
}
  1. 小容量扩容1倍
  2. 大容量扩容0.5倍
  3. 快溢出时调整为Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE

是否线程安全

非线程安全

PriorityBlockingQueue

其实现基本与PriorityQueue一致,不过PriorityBlockingQueue是线程安全的,并且实现了BlockingQueue接口,在队列为空时take会阻塞。

代码语言:javascript
复制
/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
private transient Object[] queue;

/**
 * The number of elements in the priority queue.
 */
private transient int size;

/**
 * The comparator, or null if priority queue uses elements'
 * natural ordering.
 */
private transient Comparator<? super E> comparator;

/**
 * Lock used for all public operations
 */
private final ReentrantLock lock;

/**
 * Condition for blocking when empty
 */
private final Condition notEmpty;

/**
 * Spinlock for allocation, acquired via CAS.
 */
private transient volatile int allocationSpinLock;

/**
 * A plain PriorityQueue used only for serialization,
 * to maintain compatibility with previous versions
 * of this class. Non-null only during serialization/deserialization.
 */
private PriorityQueue<E> q;

和PriorityQueue的区别:增加了

  1. 重入锁ReentrantLock
  2. Condition,用于队列空情况下的阻塞
  3. allocationSpinLock,通过CAS手段对queue扩容
代码语言:javascript
复制
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

可以看到与PriorityQueue的扩容函数很像,不同点:

  1. 调用函数时必须持有锁
  2. 使用CAS方法进行扩容,在allocationSpinLock为0,并且CAS将其置为1时,线程才能够对数组进行扩容。如果多个线程并发扩容,其余线程会调用Thread.yield()方法。

为什么这样实现PriorityBlockingQueue扩容?

因为PriorityBlockingQueue内部使用的ReentrantLock重入锁,同一个线程多次调用add函数,可能恰好同时调用了tryGrow函数。此时通过重入锁是无法加锁的,仅能通过Synchronized或CAS方式控制并发。

allocationSpinLock是transient的,因为序列化时并不需要此参数;同时又是volatile的,因为可能有多个线程同时调用。

代码语言:javascript
复制
private transient volatile int allocationSpinLock;

UNSAFE.compareAndSwapInt

代码语言:javascript
复制
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = PriorityBlockingQueue.class;
        allocationSpinLockOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("allocationSpinLock"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

调用方法

代码语言:javascript
复制
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)

allocationSpinLockOffset是allocationSpinLock变量在PriorityBlockingQueue类中的偏移量。

那么使用allocationSpinLockOffset有什么好处呢?它和直接修改allocationSpinLock变量有什么区别?

获取该字段在类中的内存偏移量,直接将内存中的值改为新值。直接修改allocationSpinLock并不是CAS。JDK 1.8代码如下:

代码语言:javascript
复制
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

在AtomicInteger类中的调用如下,getAndAddInt方法由具体类的实现方法,抽取到了UNSAFE类中:

代码语言:javascript
复制
public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

对比 PriorityQueue 和 PriorityBlockingQueue

  1. PriorityQueue是非线程安全的,PriorityBlockingQueue是线程安全的
  2. PriorityBlockingQueue使用重入锁,每一个操作都需要加锁
  3. PriorityBlockingQueue扩容时使用了CAS操作
  4. 两者都使用了堆,算法原理相同
  5. PriorityBlockingQueue可以在queue为空时阻塞take操作

JDK实现堆的方法

代码语言:javascript
复制
/**
 * Establishes the heap invariant (described above) in the entire tree,
 * assuming nothing about the order of the elements prior to the call.
 */
@SuppressWarnings("unchecked")
private void heapify() {
    for (int i = (size >>> 1) - 1; i >= 0; i--)
        siftDown(i, (E) queue[i]);
}

private void siftDown(int k, E x) {
    if (comparator != null)
        siftDownUsingComparator(k, x);
    else
        siftDownComparable(k, x);
}

@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>)x;
    int half = size >>> 1;        // loop while a non-leaf
    while (k < half) {
        int child = (k << 1) + 1; // assume left child is least
        Object c = queue[child];
        int right = child + 1;
        if (right < size &&
            ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo((E) c) <= 0)
            break;
        queue[k] = c;
        k = child;
    }
    queue[k] = key;
}

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}

private void siftUp(int k, E x) {
    if (comparator != null)
        siftUpUsingComparator(k, x);
    else
        siftUpComparable(k, x);
}

@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
    Comparable<? super E> key = (Comparable<? super E>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (key.compareTo((E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = key;
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.01.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基本使用
  • PriorityQueue
    • 成员变量
      • 基本方法
        • 扩容方法
          • 是否线程安全
          • PriorityBlockingQueue
            • UNSAFE.compareAndSwapInt
            • 对比 PriorityQueue 和 PriorityBlockingQueue
            • JDK实现堆的方法
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档