前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ArrayBlockingQueue

ArrayBlockingQueue

作者头像
码农飞哥
发布2021-08-18 10:38:13
3690
发布2021-08-18 10:38:13
举报
文章被收录于专栏:好好学习

前言

上一篇我们手写了一个阻塞队列,今天我们接着开始学习之旅,让我们一起来看看ArrayBlockingQueue的源码吧。ArrayBlockingQueue是JDK中提供的工业级的通过数组实现的阻塞队列。

初始ArrayBlockingQueue

ArrayBlockingQueue的类图

在这里插入图片描述

如上类图,ArrayBlockingQueue类继承了AbstractQueue抽象类,实现了BlockingQueue接口。那么我们先来看看BlockingQueue接口中定义了哪些方法。

BlockingQueue接口

代码语言:javascript
复制
   public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 插入数据到队列尾部(如果立即可行且不会超过该队列的容量)
     * 在成功时返回 true,如果此队列已满,则抛IllegalStateException。(与offer方法的区别)
     */
    boolean add(E e);

    /**
     * 插入数据到队列尾部,如果没有空间,直接返回false;
     * 有空间直接插入,返回true。
     */
    boolean offer(E e);

    /**
     * 插入数据到队列尾部,如果队列没有空间,一直阻塞;
     * 有空间直接插入。
     */
    void put(E e) throws InterruptedException;

    /**
     * 插入数据到队列尾部,如果没有额外的空间,等待一定的时间,有空间即插入,返回true,
     * 到时间了,还是没有额外空间,返回false。
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 取出和删除队列中的头元素,如果没有数据,会一直阻塞到有数据
     */
    E take() throws InterruptedException;

    /**
     * 取出和删除队列中的头元素,如果没有数据,需要会阻塞一定的时间,过期了还没有数据,返回null
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    //除了上述方法还有继承自Queue接口的方法 
    /**
     * 取出和删除队列头元素,如果是空队列直接返回null。
     */
    E poll();

    /**
     * 取出但不删除头元素,该方法与peek方法的区别是当队列为空时会抛出NoSuchElementException异常
     */
    E element();

    /**
     * 取出但不删除头元素,空队列直接返回null
     */
    E peek();

    /**
     * 返回队列总额外的空间
     */
    int remainingCapacity();

    /**
     * 删除队列中存在的元素
     */
    boolean remove(Object o);

   /**
    * 判断队列中是否存在当前元素
    */
    boolean contains(Object o);

}

BlockingQueue接口定义的方法主要分为两大类,一类是插入元素的方法,一类是取出元素的方法。如下图所示:

在这里插入图片描述

  1. 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列中获取元素会抛出NoSuchElementException()异常。
  2. 返回特殊值:插入方法会返回是否成功,成功则为true,移除方法,则是从队列里拿出一个元素,如果没有则返回null。
  3. 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到有消费者拿出数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  4. 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

ArrayBlockingQueue中定义的常量

代码语言:javascript
复制
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存储数据的数组 */
    final Object[] items;

    /** 获取数据的索引,用于下次 take, poll, peek or remove 等方法 */
    int takeIndex;

    /** 添加元素的索引, 用于下次 put, offer, or add 方法 */
    int putIndex;

    /** 队列元素的个数 */
    int count;

    /** 控制并发访问的锁 */
    final ReentrantLock lock;

    /** 非空条件对象,用于通知 take 方法中在等待获取数据的线程,队列中已有数据,可以执行获取操作 */
    private final Condition notEmpty;

    /** 未满条件对象,用于通知 put 方法中在等待添加数据的线程,队列未满,可以执行添加操作 */
    private final Condition notFull;

    /** 迭代器 */
    transient Itrs itrs = null;
}

如上从定义的全局变量,我们可以看出ArrayBlockingQueue 中定义了一个数组items用于存储数据,用count来记录队列元素的个数,用ReentrantLock+Condition条件变量来实现阻塞功能,其实本质上就是生产者-消费者模型。了解完重要的全局变量之后,我们接着来看看具体的入队和出队方法。

入队方法

首先我们来看看入队相关的方法,我们会分别介绍add(e)offer(e)put(e)offer(e,time,unit)这四个方法。 首先,我们来看看add(e)方法, add()方法内部也是调用了offer()方法。区别就是当插入元素失败之后会抛出IllegalStateException("Queue full")异常。

add方法
代码语言:javascript
复制
 public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

接着我们来看看offer(e)方法。

offer方法
代码语言:javascript
复制
public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

如上,offer()方法是一个线程安全的方法,当队列中的元素个数count等于队列中数组的长度时,则返回false。否则调用enqueue()方法进行入队操作,入队成功之后返回true。接下来,我们就来看看enqueue()方法。

enqueue方法
代码语言:javascript
复制
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        //将要插入的元素x保存到putIndex位置上
        items[putIndex] = x;
        //对putIndex进行自加,如果putIndex加一之后的值等于items.length则将putIndex置为0
        if (++putIndex == items.length)
            putIndex = 0;
        //count进行加1
        count++;
        //唤醒notEmpty等待队列里的一个消费者线程
        notEmpty.signal();
    }

如上enqueue方法是入队的核心方法,首先将要插入的元素x保存到putIndex位置上,然后,对putIndex进行自加,当自加之后的值等于items.length时则将putIndex置为0,为什么要怎么做呢?因为我们知道ArrayBlockingQueue是一个先进先出的队列,并且入队和出队操作都是线程安全的。入队都是从第一个位置开始依次入队。出队也是从第一个位置开始依次出队。当putIndex等于数组的长度,即putIndex到达了队尾时,只要这时候count的值不等于数组的长度,就说明前面有元素出队,所以,此时将putIndex的值赋值为0,并不会覆盖掉第一个位置上的元素。并且可以实现循环的入队和出队。插入一个元素之后,count加一,然后唤醒非空等待队列里的一个消费者线程。 入队的逻辑说完之后,我们接下来看看出队的逻辑。

put方法

前面提到的那几个入队方法都是非阻塞的方法,接下来我们看看阻塞的入队方法put()方法,当队满时,生产者线程会进入 未满(notFull)等待队列中进行等待,并释放持有的锁对象。

代码语言:javascript
复制
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

出队

我们还是会依次分析出队的相关方法,分别从 remove(),poll()take()poll(time,unit)还有take()方法。这几个方法开始分析。 首先,我们来看看remove()方法。

remove方法
代码语言:javascript
复制
    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

同样的remove()方法内部也是调用了poll()方法,只是当获取不到元素之后就会抛出NoSuchElementException()异常。 那么我们接着来看看poll()方法

poll方法
代码语言:javascript
复制
public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }

还是同样的poll()方法也是一个线程安全的方法,如果count==0的话则返回null,否则调用dequeue()方法进行出队。那么我们接着看卡dequeue()方法。

dequeue方法
代码语言:javascript
复制
    private E dequeue() {
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        //获取takeIndex位置上的元素
        E x = (E) items[takeIndex];
        //将takeIndex位置上的元素清空
        items[takeIndex] = null;
        //takeIndex加1,加一之后的takeIndex如果等于items.length,则将takeIndex的值置为0
        if (++takeIndex == items.length)
            takeIndex = 0;
        //count减1
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //唤醒未满等待队列里的一个生产者线程。
        notFull.signal();
        return x;
    }

如上dequeue()方法,还是首先获取takeIndex位置上的元素,然后对takeIndex加一,接着就是count减1,最后就是唤醒一个在未满等待队列里的生产者线程。

take方法

take方法是阻塞的出队方法,同样的,当count==0即队列为空时,消费者线程会进入非空等待队列里进行等待。如果队列不为空才会调用dequeue()方法进行出队。

代码语言:javascript
复制
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

总结

本文对ArrayBlockingQueue的源码进行了剖析,主要介绍了入队和出队的相关方法。并且对其实现阻塞功能的原理进行了详细的解释。

参考

BlockingQueue-ArrayBlockingQueue 详细源码解析

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码农飞哥 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 初始ArrayBlockingQueue
    • ArrayBlockingQueue的类图
      • BlockingQueue接口
        • ArrayBlockingQueue中定义的常量
          • 入队方法
            • add方法
            • offer方法
            • enqueue方法
            • put方法
          • 出队
            • remove方法
            • poll方法
            • dequeue方法
            • take方法
        • 总结
        • 参考
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档