专栏首页码云大作战多线程应用 - 阻塞队列ArrayBlockingQueue详解

多线程应用 - 阻塞队列ArrayBlockingQueue详解

ArrayBlockingQueue是一个阻塞式的先进先出队列。该结构具有以下三个特点:

· 先进先出队列,队列头是最先进入队列的元素,队列尾是最后进队列的元素。

· 有界队列,初始化时需要指定的队列容量,就是该队列的最大容量,队列中容量达到最大值时不会扩容,则会阻塞队列。

· 队列不支持null元素,当往队列中放入null元素时会抛出异常。

接下来以源码剖析的方式来讲解ArrayBlockingQueue。

· ArrayBlockingQueue类

public class ArrayBlockingQueue<E> extends AbstractQueue<E>

implements BlockingQueue<E>, java.io.Serializable

ArrayBlockingQueue类继承AbstractQueue,并实现BlockingQueue接口。

· ArrayBlockingQueue类属性

//队列中元素数组
final Object[] items;
//下一个读取或移除元素的位置
int takeIndex;
//下一个存放元素的位置
int putIndex;
//队列中元素数量
int count;
//可重入锁
final ReentrantLock lock;
//队列 读取元素条件
private final Condition notEmpty;
//队列 存放元素条件
private final Condition notFull;
//迭代器
transient Itrs itrs = null;

· ArrayBlockingQueue构造函数

//指定队列容量的构造函数,默认为非公平锁
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

//指定队列容量、指定锁是否公平的构造函数
public ArrayBlockingQueue(int capacity, boolean fair) {

    //容量不能小于等于0
    if (capacity <= 0)
        throw new IllegalArgumentException();
    //初始化队列(初始化数组,容量为指定的大小)
    this.items = new Object[capacity];
    //初始化锁
    lock = new ReentrantLock(fair);
    //初始化读取队列条件(队列只有有元素情况下才能读取元素      ,否则阻塞等待)
    notEmpty = lock.newCondition();
    //初始化存入队列条件    (队列只有元素容量小于数组容量才能存放元素,否则阻塞等待)
    notFull =  lock.newCondition();
}

//指定队列容量、指定锁是否公平、添加初始数组元素的构造函数
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {

    //执行第二个构造函数
    this(capacity, fair);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //当前数组下标
        int i = 0;
        try {
            //遍历初始化时需要添加的元素
            for (E e : c) {
                //校验元素不为空 为空会抛出异常,被捕获
                checkNotNull(e);
                //存放元素到数组中
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        //当前元素下标就为队列元素容量
        count = i;
        //下一个存放元素的下标
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        //解锁
        lock.unlock();
    }
}

ArrayBlockingQueue构造函数可指定初始化该阻塞队列的方式,其中容量大小必须得指定,并且大于0。还可以指定是否使用公平锁(默认为非公平锁)。并可以在初始化中放入需要放入的集合元素。

boolean fair为true时采用公平锁,线程获取锁的顺序会和线程调用lock获取锁的顺序一样,但是公平锁需要增加阻塞和唤醒的时间开销。

如果参数为false时,采用非公平锁,线程获取锁的顺序是随机获取,因此,可以根据对应的场景来选择是否采用公平锁,只有在特别需要他的时候再使用公平锁。

· ArrayBlockingQueue元素添加

public boolean add(E e) {
    //调用父类AbstractQueue的add方法,实际最后会调用下面的offer方法
    return super.add(e);
}

public boolean offer(E e) {
    //校验不为空 为空 抛出异常
    checkNotNull(e);
    //获取锁
    final ReentrantLock lock = this.lock;
    //加锁处理
    lock.lock();
    try {
        //当前队列数组元素超过最大容量 插入失败 返回false
        if (count == items.length)
            return false;
        else {
            //存入元素
            enqueue(e);
            return true;
        }
    } finally {
        //解锁
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    //校验不为空 为空 抛出异常
    checkNotNull(e);
    //获取锁
    final ReentrantLock lock = this.lock;
    //获取支持响应中断的锁
    lock.lockInterruptibly();
    try {
        //如果队列满 则调用notFull阻塞等待
        (当队列元素被取出或移除时会调用         notFull的signal方法被唤起)        while (count == items.length)
            //阻塞
            notFull.await();
        //存入元素
        enqueue(e);
    } finally {
        //解锁
        lock.unlock();
    }
}

//传入了等待结束时间
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    //校验元素是否空 为空 抛出异常
    checkNotNull(e);
    //获取阻塞时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //数组容量判断
        while (count == items.length) {
            if (nanos <= 0)
//阻塞时间到了并且容量仍没减少 就失败                  返回false
                return false;
            //根据上面阻塞时间 阻塞
            nanos = notFull.awaitNanos(nanos);
        }
        //放入元素
        enqueue(e);
        return true;
    } finally {
        //解锁
        lock.unlock();
    }
}
private void enqueue(E x) {
    //当前队列数组
    final Object[] items = this.items;
    //元素放入到数组中
    items[putIndex] = x;
    //计算下一个放入数组的下标
    //如果元素容量已满,则重置下一个放置位置为0
    if (++putIndex == items.length)
        putIndex = 0;
    //当前队列容量+1
    count++;
    //唤醒因元素为空而无法获取元素导致阻塞的线程
    notEmpty.signal();
}

添加操作并不复杂,正好验证了上面说的数组容量满了的时候不会扩容的情况,并会造成阻塞。添加操作完成后,还会唤醒因元素为空无法获取元素而阻塞住的线程。另外放入元素后队列容量达到最大值时,会重置putIndex的位置为0。

· ArrayBlockingQueue元素获取

public E poll() {
    //获取锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
//队列中午元素返回null,          队列中有元素则移除队尾元素并返回
        return (count == 0) ? null : dequeue();
    } finally {
        //解锁
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    //获取锁
    final ReentrantLock lock = this.lock;
    //获取响应中断的锁
    lock.lockInterruptibly();
    try {
        //队列中无元素,则阻塞,直到队列中存入元素被唤醒。
        while (count == 0)
            notEmpty.await();
        //获取队尾元素
        return dequeue();
    } finally {
        //解锁
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    //计算阻塞时间
    long nanos = unit.toNanos(timeout);
    //获取锁
    final ReentrantLock lock = this.lock;
    //获取响应中断的锁
    lock.lockInterruptibly();
    try {
//队列中无元素,则阻塞,阻塞时间为上面计算出来的时间。          阻塞时间过了,仍无元素就返回null
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        //去除队尾元素
        return dequeue();
    } finally {
        //解锁
        lock.unlock();
    }
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //根据takeIndex位置读取元素
        return itemAt(takeIndex);
    } finally {
        lock.unlock();
    }
}
//移除元素核心接口
private E dequeue() {
    //当前数组
    final Object[] items = this.items;
    //根据当前查询下标获取当前元素
    E x = (E) items[takeIndex];
    //移除元素
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    //减少队列的容量
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    //唤醒因队列满了导致无法读取元素而阻塞的线程
    notFull.signal();
    //返回当前元素
    return x;
}

· ArrayBlockingQueue元素移除

//根据指定下标删除元素
void removeAt(final int removeIndex) {
    //当前队列数组
    final Object[] items = this.items;
    //移除下标 和 需要移除的下标相同
    if (removeIndex == takeIndex) {
        //移除元素
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
//指定的下标如果和takeIndex不一致          会将有效的元素往前移动一位,
        //获取下一个需要存放元素的位置
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            //如果需要删除的元素是队列中的最后一个元素,              那么将next置为0
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                //行成环形队列
                items[i] = items[next];
                i = next;
            } else {
                //清空元素
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    //唤醒因队列空间满而无法放入元素导致阻塞的线程
    notFull.signal();
}

在这个元素中比较有意思的是用了notFull和NotEmpty监控用于线程的阻塞与唤醒。notFull.signal()可以唤醒因队列空间满而无法将元素放入数组导致阻塞的线程,notEmpty()可以唤醒因队列空间无数据而无法取出数组中的元素导致阻塞的线程。这些监控的开启和初始化需要与lock锁相绑定,他们的使用方法也与Object的wait()方法和notify()方法相似。

ArrayBlockingQueue中的读取元素时都会采用lock.lockInterruptibly方法获取一个支持响应中断的锁,允许发生阻塞时先释放锁资源,直到该线程被唤醒而重新获取锁。

由上面源码也可以看出ArrayBlockingQueue的读和写都是共享一个锁,因此读写同时发生时也会造成阻塞。

本文分享自微信公众号 - 码云大作战(gh_9b06dbcb85f3),作者:Y

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-31

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 多线程应用 - 阻塞队列LinkedBlockingDeque详解

    在多线程阻塞队列的应用中上一篇已经讲述了ArrayBlockingQueue,在这一篇主要介绍思想与他差不多的另一个阻塞队列,基于链表的阻塞队列-Lin...

    虞大大
  • dubbo(一)SPI机制与实现路径

    了解dubbo内核之前,我们先看下dubbo的启动流程,参考了本地启动时日志打印的dubbo启动整理出来的流程图:

    虞大大
  • 索引与慢sql剖析

    `user_id` int(9) NOT NULL AUTO_INCREMENT,

    虞大大
  • 并发队列-有界阻塞队列ArrayBlockingQueue原理探究

    上节介绍了无界链表方式的阻塞队列LinkedBlockingQueue,本节来研究下有界使用数组方式实现的阻塞队列ArrayBlockingQueue

    加多
  • 并发队列 – 有界阻塞队列 ArrayBlockingQueue 原理探究

    如图ArrayBlockingQueue内部有个数组items用来存放队列元素,putindex下标标示入队元素下标,takeIndex是出队下标,count统...

    慕容千语
  • 曾经被面试过BlockingQueue吗?

    想必很多人面试时有被问到阻塞队列的经历。我们经常会在各种代码中见到或者用到它,最经常见到的地方就是线程池。

    PhoenixZheng
  • 两个死锁的实例 (r5笔记第90天)

    关于数据库中的死锁。如果在应用中碰到都会毫不犹豫转交给DBA,但是从目前我接到的deadlock的问题来看,和Oracle官方的描述基本都是一致的。 The f...

    jeanron100
  • python 基础系列--可迭代对象、迭代器与生成器

    迭代器是 Python 最强大的功能之一,可以想像如果有个几十 GB 的大文件,你需要编写程序处理其中的文本信息,如果一次性全部读入内存,估计机器会直接罢工了,...

    somenzz
  • 生成器&迭代器

    一.生成器 在介绍生成器表达式之前,先看下列表表达式: 1 >>> l = [i for i in range(50) if i % 2] #生成...

    用户1679793
  • 编译源码安装PHP 5.4

    我们使用vagrant建立虚拟环境,这里使用"chef/centos-6.5"这个box,这个box是一个比较纯净的CentOS-6.5系统。

    用户2131907

扫码关注云+社区

领取腾讯云代金券