前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java中的队列[通俗易懂]

Java中的队列[通俗易懂]

作者头像
全栈程序员站长
发布2022-09-08 16:12:20
6520
发布2022-09-08 16:12:20
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

从初学者的角度,认真地学习Java中队列的使用和设计。

参考


Queue

继承Collection接口

Deque

一个支持两端插入和删除的线性集合,此接口支持容量受限和不受限的双端队列(大多数实现容量不受限)。

该接口定义了访问两端元素的方法,主要是插入、删除、检查元素方法。这些方法主要有两种形式,一种在操作失败时引发异常,一种在操作失败时返回特殊值(null 或者false)。这里着重提一下插入操作,只有当队列容量受限时,插入操作才可能失败。

12个方法如下

该接口扩展了Queue接口。 当双端队列被用作队列时,将导致FIFO(先进先出)行为。 元素在双端队列的末尾添加,并从开头删除。 从Queue接口继承的方法与Deque方法完全等效,如下表所示:

双端队列也可以用作LIFO(后进先出)堆栈。 此接口应优先于旧版Stack类使用。 当双端队列被用作堆栈时,元素从双端队列的开始处被压入并弹出。 堆栈方法等同于Deque方法如下表所示:

强烈建议不要在队列中插入null ,因为null是队列中某些方法的返回值,具有特殊意义,比如队列中没有元素了。

BlockingQueue

BlockingQueue支持队列访问时阻塞(如果检索队列时队列已空,等待其有元素后再返回;如果存放元素时队列已满,等待队列有空间存放元素后再返回)。

接口内部的方法主要可以分为以下4类

该接口的方法是线程安全的,因为实现类内部都使用了锁机制.

ArrayBlockingQueue

数组支持的有界阻塞队列。该队列对元素FIFO(先进先出)进行排序。队列的开头是已在队列中停留最长时间的元素。队列的尾部是最短时间位于队列中的元素。新元素插入到队列的尾部,并且队列检索操作在队列的开头获取元素。

这是经典的“有界缓冲区”,其中固定大小的数组包含由生产者插入并由消费者提取的元素。创建后,容量将无法更改。试图将一个元素放入一个完整的队列将导致操作阻塞(put方法)。试图从空队列中取出一个元素的尝试也会类似地阻塞(take方法)。

此类支持给予等待的生产者和使用者线程一个可选的公平性策略。默认情况下,不保证此排序(公平性策略为false)。但是,将公平性设置为true构造的队列将按FIFO顺序授予线程访问权限。公平通常会降低吞吐量,但会减少可变性并避免饥饿

分析一段代码

代码语言:javascript
复制
public void test() throws InterruptedException { 
   
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<String>(4);
        new Thread(new Runnable() { 
   
            @Override
            public void run() { 
   
                try { 
   
                    blockingQueue.put("1");
                    blockingQueue.put("2");
                    blockingQueue.put("3");
                    blockingQueue.put("4");
                    blockingQueue.put("5");
                    blockingQueue.put("6");
                }catch (Exception e){ 
   
                }
            }
        },"thread1").start();
        
        
        Thread.sleep(2*1000);
        //使thread1执行到put(5)时阻塞住
        //此时数组元素为[1,2,3,4]
        new Thread(new Runnable() { 
   
            @Override
            public void run() { 
   
                try { 
   
                    //移除[1,2,3,4]中的1,2
                    blockingQueue.take();
                    blockingQueue.take();
                    //接着thread1执行put(5),put(6)
                    //数组元素此时为[5,6,3,4]
                }catch (Exception e){ 
   
                }
            }
        },"thread2").start();
        
        while (true){ 
   
        }
    }

内部维护了一个putIndex, takeIndex,表示此次put(take)元素的位置(从0开始), 到达数组元素size后,重置为0,重新来算。

所以FIFO,元素出队列后,并没有随后的元素都往前移动一位,而是将take的指针takeIndex往后移动一位,逻辑上等价于元素都往前移动一位。

LinkedBlockingQueue

链表组成的有界阻塞队列,内部使用两个可冲入锁, put操作可重入锁, take操作可冲入锁, 以及两个锁上相应的Condition

代码语言:javascript
复制
public void put(E e) throws InterruptedException { 
   
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try { 
   
            
            while (count.get() == capacity) { 
   
                //队列已满,阻塞当前
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //队列不满,唤醒put操作线程
                //这里是必要的,因为假设上面的线程A notFull.await释放了put锁,
                // 其他的线程B也进入了while死循环,当线程C执行take()后
                // A线程获取到put锁,跳出while循环, count原子性+1, 
                // 线程C仍处在while循环中
                // A signal唤醒C并且A释放put锁后,C才有可能从while中恢复
                notFull.signal();
        } finally { 
   
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }


private void signalNotEmpty() { 
   
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try { 
   
            notEmpty.signal();
        } finally { 
   
            takeLock.unlock();
        }
    }
..................................................
public E take() throws InterruptedException { 
   
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try { 
   
            while (count.get() == 0) { 
   
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally { 
   
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

private void signalNotFull() { 
   
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try { 
   
            notFull.signal();
        } finally { 
   
            putLock.unlock();
        }
    }

PriorityBlockingQueue

无界数组结构优先级队列(无界是doc上描述的,实际上数组长度长度不得超过Integer.MAX_VALUE-8)

使用了最小堆,put元素时,维护一个最小堆;take元素时,移除队首元素,则是堆顶元素(最小值),优先级最高(或者最低,具体看comparable接口方法的实现)的元素首先被移除

参考PriorityBlockingQueue 原理分析

有几个关键方法,put元素构造最小堆的siftUpZComparable方法,如下:

代码语言:javascript
复制
//将元素x置于k位置(k从0算起)
//方法执行完后,维护着一个数组结构的最小堆
private static <T> void siftUpComparable(int k, T x, Object[] array) { 
   
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) { 
   
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            //表示当前元素>=父节点元素,不做堆调整
            if (key.compareTo((T) e) >= 0)
                break;
            //当前元素 < 父节点元素,要做调整
            array[k] = e;
            k = parent;
        } //直到不比父节点小,才跳出循环
        
        //将入参元素置于节点k中
        //1. 若一进来,x元素就>=父节点,则k=入参中的k
        //2. 若while循环执行过,则k就是入参k的某级父节点,将入参元素x设置到其位置上
        array[k] = key;
    }

take元素移除首节点(移除优先级最低的元素), siftDownComparable

代码语言:javascript
复制
//将x元素插入到k位置(k从0算起),向下调整
//执行take时,最终会调用到siftDownComparable(0,x,array, size-1)
private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) { 
   
        if (n > 0) { 
   
            Comparable<? super T> key = (Comparable<? super T>)x;
            //得到叶子节点的上一层下标
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) { 
   
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                int right = child + 1;
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                //c表示k节点左右孩子节点中较小的那一个元素
                //child表示k节点左右孩子中较小的元素的下标
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }

举例: 移除队首元素前,队列是最小堆结构,如下:

take方法执行后,内部方法siftDownComparable执行前,结构如下:

执行siftDownComparable(0, 5, [2,7,3,8,9,4], 6)方法, while循环第一次执行完后,结构如下,此时half=3, array[0]=3,k=2,

while循环第二次执行后,结构如下,此时half=3, array[2]=4, k = 5

while第三次执行时,while条件不满足,跳出,执行 array[5]=5,结束,结构如下:

至此,一个新的最小堆又生成了。

移除下标i的元素,总是等价于将数组最后一个下标元素值A取出来,但是下标引用置为null, 将A插入到i位置,但是这个插入的位置涉及到堆的向下调整,这是本质。


PriorityBlockingQueue构造方法支持入参传递一个集合对象(集合对象自然要实现Comparable接口),则构造方法内部就需要立刻调整元素,形成最小堆,如下

代码语言:javascript
复制
private void heapify() { 
   
        Object[] array = queue;
        int n = size;
        int half = (n >>> 1) - 1;
        //可以看出,从叶子节点的上一层开始,向下调整形成局部最小堆
        // i--,同样的调整操作直至操作到顶部节点,就调整生成了全局的最小堆
        Comparator<? super E> cmp = comparator;
        if (cmp == null) { 
   
            for (int i = half; i >= 0; i--)
                siftDownComparable(i, (E) array[i], array, n);
        }
        else { 
   
            for (int i = half; i >= 0; i--)
                siftDownUsingComparator(i, (E) array[i], array, n, cmp);
        }
    }

DelayQueue

无界延迟性阻塞队列,jdk中的java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask就是一个实现DelayQueue接口的子类

参考 DelayQueue详解

测试用例

代码语言:javascript
复制
@Test
    public void testScheduleTask() throws InterruptedException { 
   
        ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(5);
        for (int i = 0; i < 10; i++) { 
   
            int finalI = i;
            service.schedule(new Runnable() { 
   
                @Override
                public void run() { 
   
                    //十个任务,每个任务执行时间相差1s
                    System.out.println("i=" + finalI +
                            ", and thread=" +Thread.currentThread().getName() +
                            ",time=" + TestUtils.getTime());
                }
            }, i * 1000, TimeUnit.MILLISECONDS);
        }

        Thread.sleep(20*1000);
    }

SynchronousQueue

参考 Java 并发 — 阻塞队列之SynchronousQueue源码分析, 非常好的一篇从源码角度分析SynchronousQueue的博客。

LinkedTransferQueue

无界阻塞队列 Java 并发 — 阻塞队列之LinkedTransferQueue源码分析

LinkedBlockingQueue

链表结构的双向阻塞队列 Java 并发 — 阻塞队列之LinkedBlockingDeque源码分析

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/156208.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考
    • Queue
      • Deque
        • BlockingQueue
          • ArrayBlockingQueue
          • LinkedBlockingQueue
          • PriorityBlockingQueue
          • DelayQueue
          • SynchronousQueue
          • LinkedTransferQueue
          • LinkedBlockingQueue
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档