Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究

并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究

作者头像
加多
发布于 2018-09-06 06:56:51
发布于 2018-09-06 06:56:51
1.2K10
代码可运行
举报
文章被收录于专栏:Java编程技术Java编程技术
运行总次数:0
代码可运行

一、 前言

PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现,研究过数组方式存放最小堆节点的都知道,直接遍历队列元素是无序的。

二、 PriorityBlockingQueue类图结构

image.png

如图PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLockOffset是用来在扩容队列时候做cas的,目的是保证只有一个线程可以进行扩容。 由于这是一个优先级队列所以有个比较器comparator用来比较元素大小。lock独占锁对象用来控制同时只能有一个线程可以进行入队出队操作。notEmpty条件变量用来实现take方法阻塞模式。这里没有notFull 条件变量是因为这里的put操作是非阻塞的,为啥要设计为非阻塞的是因为这是无界队列。 最后PriorityQueue q用来搞序列化的。

如下构造函数,默认队列容量为11,默认比较器为null;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 private static final int DEFAULT_INITIAL_CAPACITY = 11;


 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

三、 offer操作

在队列插入一个元素,由于是无界队列,所以一直为成功返回true;

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;

    //如果当前元素个数>=队列容量,则扩容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);


    try {
        Comparator<? super E> cmp = comparator;

        //默认比较器为null
        if (cmp == null)(2)
            siftUpComparable(n, e, array);
        else
            //自定义比较器(3)
            siftUpUsingComparator(n, e, array, cmp);

        //队列元素增加1,并且激活notEmpty的条件队列里面的一个阻塞线程
        size = n + 1;9)
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

主流程比较简单,下面看看两个主要函数

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //must release and then re-acquire main lock
    Object[] newArray = null;

    //cas成功则扩容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用cas控制只有一个线程可以扩容成功。我的理解是为了性能,因为扩容时候是需要花时间的,如果这些操作时候还占用锁那么其他线程在这个时候是不能进行出队操作的,也不能进行入队操作,这大大降低了并发性。

所以在扩容前释放锁,这允许其他出队线程可以进行出队操作,但是由于释放了锁,所以也允许在扩容时候进行入队操作,这就会导致多个线程进行扩容会出现问题,所以这里使用了一个spinlock用cas控制只有一个线程可以进行扩容,失败的线程调用Thread.yield()让出cpu,目的意在让扩容线程扩容后优先调用lock.lock重新获取锁,但是这得不到一定的保证,有可能调用Thread.yield()的线程先获取了锁。

那copy元素数据到新数组为啥放到获取锁后面那?原因应该是因为可见性问题,因为queue并没有被volatile修饰。另外有可能在扩容时候进行了出队操作,如果直接拷贝可能看到的数组元素不是最新的。而通过调用Lock后,获取的数组则是最新的,并且在释放锁前 数组内容不会变化。

具体建堆算法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;

    //队列元素个数>0则判断插入位置,否者直接入队(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}

下面用图说话模拟下过程: 假设队列容量为2

  • 第一次offer(2)时候

image.png

执行(1)为false所以执行(2),由于k=n=size=0;所以执行(8)元素入队,然执行(9)size+1; 现在队列状态:

image.png

  • 第二次offer(4)时候 执行(1)为false,所以执行(2)由于k=1,所以进入while循环,parent=0;e=2;key=4;key>e所以break;然后把4存到数据下标为1的地方,这时候队列状态为:

image.png

  • 第三次offer(6)时候 执行(1)为true,所以调用tryGrow,由于2<64所以newCap=2 + (2+2)=6;然后创建新数组并拷贝,然后调用siftUpComparable;k=2>0进入循环 parent=0;e=2;key=6;key>e所以break;然后把6放入下标为2的地方,现在队列状态:

image.png

  • 第四次offer(1)时候 执行(1)为false,所以执行(2)由于k=3,所以进入while循环,parent=0;e=2;key=1; key<e;所以把2复制到数组下标为3的地方,然后k=0退出循环;然后把2存放到下标为0地方,现在状态:

image.png

四、 poll操作

在队列头部获取并移除一个元素,如果队列为空,则返回null

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}

主要看dequeue

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private E dequeue() {

    //队列为空,则返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {


        //获取队头元素(1)
        Object[] array = queue;
        E result = (E) array[0];

        //获取对尾元素,并值null(2)
        E x = (E) array[n];
        array[n] = null;

        Comparator<? super E> cmp = comparator;
        if (cmp == null)//cmp=null则调用这个,把对尾元素位置插入到0位置,并且调整堆为最小堆(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;4return result;
    }
}
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];5)
                int right = child + 1;6)
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)(8)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;(9)
        }
    }

下面用图说话模拟下过程:

  • 第一次调用poll() 首先执行(1) result=1;然后执行(2)x=2;这时候队列状态

image.png

然后执行(3)后状态为:

image.png

执行(4)后的结果:

image.png

下面重点说说siftDownComparable这个屌屌的建立最小堆的算法: 首先说下思想,其中k一开始为0,x为数组里面最后一个元素,由于第0个元素为树根,被出队时候要被搞掉,所以建堆要从它的左右孩子节点找一个最小的值来当树根,子树根被搞掉后,会找子树的左右孩子最小的元素来代替,直到树节点为止,还不明白,没关系,看图说话: 假如当前队列元素:

image.png

那么对于树为:

image.png

这时候如果调用了poll();那么result=2;x=11;现在树为:

image.png

然后看leftChildVal = 4;rightChildVal = 6; 4<6;所以c=4;也就是获取根节点的左右孩子值小的那一个; 然后看11>4也就是key>c;然后把c放入树根,现在树为:

image.png

然后看根的左边孩子4为根的子树我们要为这个字树找一个根节点 看leftChildVal = 8;rightChildVal = 10; 8<10;所以c=8;也就是获取根节点的左右孩子值小的那一个; 然后看11>8也就是key>c;然后把c放入树根,现在树为:

image.png

这时候k=3;half=3所以推出循环,执行(9)后结果为:

image.png

这时候队列为:

image.png

五、 put操作

内部调用的offer,由于是无界队列,所以不需要阻塞

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void put(E e) {
    offer(e); // never need to block
}

六、 take操作

获取队列头元素,如果队列为空则阻塞。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //如果队列为空,则阻塞,把当前线程放入notEmpty的条件队列
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

这里是阻塞实现,阻塞后直到入队操作调用notEmpty.signal 才会返回。

七、 size操作

获取队列元个数,由于加了独占锁所以返回结果是精确的

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

八、 开源框架中使用

目前还没找到..

九、总结

PriorityBlockingQueue类似于ArrayBlockingQueue内部使用一个独占锁来控制同时只有一个线程可以进行入队和出队,另外前者只使用了一个notEmpty条件变量而没有notFull这是因为前者是无界队列,当put时候永远不会处于await所以也不需要被唤醒。

PriorityBlockingQueue始终保证出队的元素是优先级最高的元素,并且可以定制优先级的规则,内部通过使用一个二叉树最小堆算法来维护内部数组,这个数组是可扩容的,当当前元素个数>=最大容量时候会通过算法扩容。

值得注意的是为了避免在扩容操作时候其他线程不能进行出队操作,实现上使用了先释放锁,然后通过cas保证同时只有一个线程可以扩容成功。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.06.16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
1 条评论
热度
最新
有一个疑问,扩容的时候只能有一个线程扩容成功吗?毕竟在释放掉乐观锁并且重新获取主锁之间会有一个间隔,在这个间隔里,可能其他线程也获取到了乐观锁并且发生扩容了,如果前一个线程在获取到乐观锁之后时间片到了,那么多个线程扩容成功的情况发生的可能性应该也会增加吧。
有一个疑问,扩容的时候只能有一个线程扩容成功吗?毕竟在释放掉乐观锁并且重新获取主锁之间会有一个间隔,在这个间隔里,可能其他线程也获取到了乐观锁并且发生扩容了,如果前一个线程在获取到乐观锁之后时间片到了,那么多个线程扩容成功的情况发生的可能性应该也会增加吧。
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
阻塞队列实现之PriorityBlockingQueue源码解析
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,基于数组的二叉堆,其实就是线程安全的PriorityQueue。
烂猪皮
2023/09/04
1830
阻塞队列实现之PriorityBlockingQueue源码解析
PriorityBlockingQueue详解
  【1】PriorityBlockingQueue是一个无界的基于数组的优先级阻塞队列,数组的默认长度是11,也可以指定数组的长度,且可以无限的扩充,直到资源消耗尽为止,每次出队都返回优先级别最高的或者最低的元素。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定Comparator来对元素进行排序。需要注意的是PriorityBlockingQueue不能保证同优先级元素的顺序。
忧愁的chafry
2022/10/30
6120
PriorityBlockingQueue详解
死磕 java集合之PriorityBlockingQueue源码分析
PriorityBlockingQueue是java并发包下的优先级阻塞队列,它是线程安全的,如果让你来实现你会怎么实现它呢?
彤哥
2019/07/08
3240
JDK源码分析-PriorityBlockingQueue
前文「JDK源码分析-PriorityQueue」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:
WriteOnRead
2019/08/16
3550
Java并发队列原理剖析
LinkedBlockingQueue和ArrayBlockingQueue比较简单,不进行讲解了。下面只介绍PriorityBlockingQueue和DelayQueue。
用户4283147
2022/10/27
2610
Java并发队列原理剖析
【死磕Java并发】-----J.U.C之阻塞队列:PriorityBlockingQueue
我们知道线程Thread可以调用setPriority(int newPriority)来设置优先级的,线程优先级高的线程先执行,优先级低的后执行。而前面介绍的ArrayBlockingQueue、LinkedBlockingQueue都是采用FIFO原则来确定线程执行的先后顺序,那么有没有一个队列可以支持优先级呢? PriorityBlockingQueue 。 PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序升序排序,当然我们也可以通过构造函数来指定C
用户1655470
2018/04/26
7500
【死磕Java并发】-----J.U.C之阻塞队列:PriorityBlockingQueue
并发编程4:Java 阻塞队列源码分析(上)
上篇文章 并发编程3:线程池的使用与执行流程 中我们了解到,线程池中需要使用阻塞队列来保存待执行的任务。这篇文章我们来详细了解下 Java 中的阻塞队列究竟是什么。 什么是阻塞队列 阻塞队列其实就是生
张拭心 shixinzhang
2018/01/05
1.5K0
并发编程4:Java 阻塞队列源码分析(上)
PriorityBlockingQueue源码解析
本文探讨了如何通过技术手段实现加载状态与流畅交互的优化,旨在提升用户体验。文章详细介绍了在应用开发中,如何通过合理的架构设计、异步处理和性能优化等策略,减少用户等待时间,提高系统的响应速度和稳定性。
后台技术汇
2024/11/22
980
PriorityBlockingQueue源码解析
并发阻塞队列BlockingQueue解读
首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
大忽悠爱学习
2022/10/24
9820
并发阻塞队列BlockingQueue解读
阻塞队列与非阻塞队列
Java提供很多线程安全的容器,为开发人员在并发编程场景下使用,通常我们会更加关注业务实现,而不关心底层结构。但我们应该理解这些容器的原理和使用场景,以方便我们的开发和遇到问题的分析,并且有时候也能借鉴一下大神们的实现思想。
搬砖俱乐部
2019/06/15
3.2K0
JUC-BlockingQueue二
LinkedTransferQueue是一个由链表结构组成的无界阻塞队列,相对于其它阻塞队列,LinkedBlockingQueue可以算是LinkedBlockingQueue与SynhronoousQueue结合,LinkedtransferQueue是一种无界阻塞队列,底层基于单链表实现,其内部结构分为数据节点、请求节点,基于CAS无锁算法实现
才疏学浅的木子
2023/10/17
1430
JUC-BlockingQueue二
解读 Java 并发队列 BlockingQueue
原文出处:https://javadoop.com/post/java-concurrent-queue
Java
2018/10/23
6650
解读 Java 并发队列 BlockingQueue
BlockingQueue
BlockingQueue 是一个先进先出的队列(Queue), 并且当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
leobhao
2022/06/28
2810
Java 优先队列 PriorityQueue PriorityBlockingQueue 源码分析
通过数组实现一个堆,元素在queue数组中并不是完全有序的,仅堆顶元素最大或最小。
Yano_nankai
2019/02/25
1K0
解读 Java 并发队列 BlockingQueue
转自:https://javadoop.com/post/java-concurrent-queue
Java技术江湖
2019/09/25
6090
(juc系列)优先级阻塞队列 Priotiryblockingqueue
一个无界的阻塞队列,使用相同的排队规则PriorityQueue并且提供阻塞的操作. 因为这个队列逻辑上是误解的,尝试添加操作可能会失败,由于资环耗尽了(比如OOM).
呼延十
2021/11/10
4160
PriorityBlockingQueue优先队列的二叉堆实现
转载请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/6293807.html
sanmutongzi
2020/03/05
5100
多线程之阻塞队列
DelayQueue每次都是将元素加入排序队列,以delay/过期时间为排序因素,将快过期的元素放在队首,取数据的时候每次都是先取快过期的元素。 构造方法
OPice
2019/10/23
8820
【原创】Java并发编程系列32 | 阻塞队列(下)
阻塞队列在并发编程非常常用,被广泛使用在“生产者-消费者”问题中。本文是阻塞队列下篇。
java进阶架构师
2020/08/28
4410
Java Review - 并发编程_PriorityBlockingQueue原理&源码剖析
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素。
小小工匠
2021/12/30
2250
Java Review - 并发编程_PriorityBlockingQueue原理&源码剖析
相关推荐
阻塞队列实现之PriorityBlockingQueue源码解析
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文