前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK源码分析-PriorityBlockingQueue

JDK源码分析-PriorityBlockingQueue

作者头像
WriteOnRead
发布2019-08-16 10:31:31
3280
发布2019-08-16 10:31:31
举报
文章被收录于专栏:WriteOnReadWriteOnRead

概述

前文「JDK源码分析-PriorityQueue」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:

PriorityBlockingQueue 与 PriorityQueue 的内部结构类似,也是物理上由数组、逻辑上由堆结构实现的,并且使用 ReentrantLock 实现线程安全。除此之外,二者大部分操作都是类似的。

因此,有了前文的铺垫,这里相对更容易理解一些。下面分析其代码实现。

代码分析

主要成员变量

// 内部数组的默认初始化容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 内部数组的最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

// 保存元素的内部数组
private transient Object[] queue;

// 队列中元素的数量
private transient int size;

// 队列中元素的比较器
private transient Comparator<? super E> comparator;

// 互斥锁(保证线程安全)
private final ReentrantLock lock;

// 表示队列非空的条件
private final Condition notEmpty;

// 扩容时使用的自旋锁,通过 CAS 获取(后面分析)
private transient volatile int allocationSpinLock;

// 一个普通的优先队列,主要用于序列化和反序列化
private PriorityQueue<E> q;

构造器

// 构造器 1:使用默认的初始化容量创建一个对象
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

// 构造器 2:使用给定的容量创建一个对象
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

// 构造器 3:使用给定的容量和比较器创建一个对象
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

上面几个构造器都是比较简单的赋值。除此之外,还有一个用给定集合初始化的构造器,如下:

public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    // 是否需要堆化
    boolean heapify = true; // true if not known to be in heap order
    // 是否需要筛选空值
    boolean screen = true;  // true if must screen for nulls
    // 给定集合为 SortedSet
    if (c instanceof SortedSet<?>) {
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
        heapify = false; // 已经有序,不需要再堆化
    }
    // 给定集合为 PriorityBlockingQueue
    else if (c instanceof PriorityBlockingQueue<?>) {
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
        screen = false; // 不需要筛选判空
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
            heapify = false; // 不需要堆化
    }
    // 集合转为数组
    Object[] a = c.toArray();
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    // 集合内所有元素都不能为空
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
    this.queue = a;
    this.size = n;
    if (heapify)
        heapify(); // 堆化
}

堆化操作 heapify 代码如下:

private void heapify() {
    Object[] array = queue;
    int n = size;
    int half = (n >>> 1) - 1;
    Comparator<? super E> cmp = comparator;
    // 根据比较器(Comparator)是否为空,采用不同的策略
    // PS: 二者操作基本一样,只是 Comparator 和 Comparable 的区别
    if (cmp == null) {
        for (int i = half; i >= 0; i--)
            siftDownComparable(i, (E) array[i], array, n);
    }
    else {
        for (int i = half; i >= 0; i--)
            siftDownUsingComparator(i, (E) array[i], array, n, cmp);
    }
}

siftDownUsingComparator 代码如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
                                                int n,
                                                Comparator<? super T> cmp) {
    if (n > 0) {
        // 数组的中间位置
        int half = n >>> 1;
        while (k < half) {
            // 获取索引为 k 的节点左子节点索引
            int child = (k << 1) + 1;
            // 获取 child 的值
            Object c = array[child];
            // 获取索引为 k 的节点右子节点索引
            int right = child + 1;
            // 比较左右子节点的值,取较小的一个
            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                c = array[child = right];
            // 给定的元素 x 与其较小的子节点的值比较,若 x 不大于子节点的值,停止交换
            if (cmp.compare(x, (T) c) <= 0)
                break;
            // 将 x 与其较小的子节点互换位置
            array[k] = c;
            k = child;
        }
        array[k] = x;
    }
}

该方法与 PriorityQueue 中的 siftDownUsingComparator 方法操作几乎完全一致,可参考前文的分析,这里不再赘述(siftDownComparable 方法亦是如此)。

入队方法:add(E), put(E), offer(E, timeout, TimeUnit), offer(E)

public boolean add(E e) {
    return offer(e);
}

public void put(E e) {
    offer(e); // never need to block
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e); // never need to block
}

上述三个方法内部都是通过 offer(e) 方法实现的,因此只需分析 offer(e) 方法即可:

public boolean offer(E e) {
    // 插入元素不能为空
    if (e == null)
        throw new NullPointerException();
    // 获取锁,保证线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 如果容量不够,则进行扩容(注意这里是一个循环)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap); // 尝试扩容
    try {
        Comparator<? super E> cmp = comparator;
        // 根据 Comparator 是否为空采用不同的堆化策略
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // 有新元素插入了,唤醒 notEmpty 条件下等待的线程(消费者)
        notEmpty.signal();
    } finally {
        // 释放锁
        lock.unlock();
    }
    return true;
}

下面分析一下扩容操作 tryGrow:

private void tryGrow(Object[] array, int oldCap) {
    // 释放锁
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 尝试以 CAS 方式修改 allocationSpinLock 的值(将 0 改为 1)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // 若旧容量 n 较小(小于 64),则扩容为 2 * n + 2,否则扩容为 1.5 * n
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 创建一个新数组
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            // 将 allocationSpinLock 重置为 0
            allocationSpinLock = 0;
        }
    }
    // newArray 为空表示未进行上述扩容操作,则当前线程让出 CPU 时间
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    // 尝试获取锁
    lock.lock();
    // 到这里表示扩容成功
    // queue == array 保证老数据复制一次
    if (newArray != null && queue == array) {
        // 扩容后的新数组
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

这个扩容方法比较有意思:它刚开始会释放锁,而后再重新获取锁。

1. 为什么刚开始要释放锁?

由于该锁是全局的,其他大部分公有(public)方法也会用到;而扩容操作又相对比较耗时,若这里不释放,则某个线程扩容时其他方法调用可能会阻塞。

2. 释放锁之后如何保证线程安全?

这就用到了成员变量 allocationSpinLock,使用了 Unsafe 类的 CAS 操作。它尝试将 allocationSpinLock 的值设置为 1,而一旦操作成功,其他线程就无法进入,直到该线程将它重置为 0. 这就保证了同一时间内只能有一个线程在扩容。

3. 在释放锁后的扩容操作中,先后可能会有多个线程扩容,也即会产生多个新容量的空数组(此时它们都未指向原先的数组 queue),如何避免老数据多次复制到新数组呢?

代码里用到了 queue == array 这个判断。

比如线程 T1 和 T2 都对原数组进行了扩容,得到了两个 newArray,在后面复制老数据时,若其中一个线程已经对 queue 重新赋值并复制后,由于 queue 已经改变,后面的线程就不会再复制一次了。

出队方法:poll(), take(), peek()

// 出队
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
// 出队(队列为空时阻塞)
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

// 有超时等待的出队
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null && nanos > 0)
            nanos = notEmpty.awaitNanos(nanos);
    } finally {
        lock.unlock();
    }
    return result;
}

可以看到这几个出队的操作都加了锁,内部都调用了 dequeue 方法:

private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 取数据中的第一个元素
        E result = (E) array[0];
        // 获取最后一个元素
        E x = (E) array[n];
        // 将最后一个元素置空,并恢复堆结构
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}

该方法与 PriorityQueue 的出队操作 poll() 类似,也不再赘述。

小结

1. PriorityBlockingQueue 是优先队列的阻塞方式实现,它与 PriorityQueue 内部结构类似,即物理结构是可变数组、逻辑结构是堆;

2. PriorityBlockingQueue 内部元素不能为空,且可比较,使用 ReentrantLock 保证线程安全。

参考链接:

https://juejin.im/post/5cc258796fb9a03228616e6e

https://blog.csdn.net/codejas/article/details/89190774

相关阅读:

JDK源码分析-PriorityQueue

JDK源码分析-ReentrantLock

Stay hungry, stay foolish.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 WriteOnRead 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档