JDK容器学习之Queue: ArrayBlockingQueue

基于数组阻塞队列 ArrayBlockingQueue

前面学习了基于数组的非阻塞双端队列ArrayDeque,其内部维护一个数组和指向队列头和队列尾索引的两个成员变量;本篇则探究下基于数组的阻塞队列是什么样的数据结构,又有什么特性,相较于ArrayDeque又有什么异同;然后就是使用场景了

I. 底层数据结构

先看内部成员变量定义, 和 ArrayDequeue相比,差别不大,一个数组,两个索引;此外多了一个锁和两个判定条件

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

注意

  1. 底层数据结构依然是数组,注释上并没有说明要求容量是2的n次方(ArrayDequeue有这个强制限定)
  2. 初始化时必须指定队列的容量(即数组的长度,构造方法中的必选参数)
  3. count直接表示队列的元素个数(注意DelayQueue是通过遍历来获取队列长度,且并发修改会有问题,那么这个是如何保证并发的?)
  4. 留一个疑问,塞入队列超过数组容量,是否会出现扩容

数据结构如下图

![aryBlockQueueStruct.jpeg](quiver-image-url/5AEE167065A2E49EB3DDBC449BCF991E.jpg =374x165)

II. 阻塞实现原理

0. Prefer

分析阻塞原理之前,先通过注释解释下ArrayBlockingQueue的使用场景

  • 先进先出队列(队列头的是最先进队的元素;队列尾的是最后进队的元素)
  • 有界队列(即初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作)
  • 队列不支持空元素

1. 进队

通用的进队方法如下,是非阻塞的方式,当数组满时,直接返回false,为保证并发安全,进队操作是加锁实现

public boolean offer(E e) {
    // 非空校验
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock(); // 进队加锁
    try {
        if (count == items.length) 
        // 队列满,则直接返回false
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

// 直接将元素塞入数组
private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

阻塞方式的进队实现如下

public void put(E e) throws InterruptedException {
    checkNotNull(e); // 非空判断
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 获取锁
    try {
        while (count == items.length) {
        // 一直阻塞,知道队列非满时,被唤醒
            notFull.await();
        }
        
        enqueue(e); // 进队
    } finally {
        lock.unlock();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
        // 阻塞,知道队列不满
        // 或者超时时间已过,返回false
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

源码分析,阻塞入队的逻辑比较清晰,小结一下

  • 非阻塞调用方式 offer(e)
  • 阻塞调用方式 put(e)offer(e, timeout, unit)
  • 阻塞调用时,唤醒条件为超时或者队列非满(因此,要求在出队时,要发起一个唤醒操作)
  • 进队成功之后,执行notEmpty.signal()唤起被阻塞的出队线程

2. 出队

非阻塞出队方法如下

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    // 直接将队头扔出去,并置空数组中该位置
    // 并移动队列头到下一位
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
    // 保障在遍历时,可以进行出队操作
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

阻塞的实现,逻辑比较清晰,首先竞争锁,判断是否为空,是阻塞直到非空;否则弹出队列头元素

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) 
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

小结

  • 非阻塞出队调用 poll()方法
  • 阻塞出队调用 take()poll(long,TimeUnit)方法
  • 出队之后,会唤醒因队列满被阻塞的入队线程
  • 出队count计数减1(因为每次只能一个线程出队,所以count的值可以保证并发准确性)
  • 支持在遍历队列时,出队

III. 应用场景及case

创建线程池时,通常会用到 ArrayBlockingQueue或者LinkedBlockingQueue,如

new ThreadPoolExecutor(1, 1,
          0L, TimeUnit.MILLISECONDS,
          new ArrayBlockingQueue<Runnable>(2));

延迟队列也是并发安全,ArrayBlockingQueue 相比较 DelayQueue应用场景的区别主要在

  • 有界和无界(ArrayBlockingQueue不会扩容,而DelayQueue会出现扩容)
  • 前者队列非空就可以出队;后者则需要队列头生效(getDelay()返回值小于0)
  • 前者队列满,则无法入队;后者一直都可以入队
  • 前者FIFO;后者优先级队列,延迟时间小的优先出队

IV. 小结

基于数组阻塞队列ArrayBlockingQueue

  • 有界数组阻塞队列,FIFO,先进先出,初始化时指定数组长度
  • 阻塞出队方法: take()poll(long, TimeUnit)
  • 非阻塞出队方法: poll()
  • 阻塞入队方法: offer(E, long, TimeUnit)put(E)
  • 非阻塞入队方法: offer(E) add(E)
  • 队列为空,出队会被阻塞;队列满时,进队会被阻塞
  • 根据内部计数count,可以直接获取队列长度(count的并发安全是由进队出队上锁,保证同一时刻只有一个线程修改count值保证的)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏张善友的专栏

第一个IronPython程序(之二)

万物皆对象,意思是 IronPython 函数有属性, 并且这些属性在运行时是可用的。在 IronPython(Python) 中, 函数同其它东西一样也是对象...

1888
来自专栏大内老A

采用一个自创的"验证框架"实现对数据实体的验证[设计篇]

没有想到自己头脑发热写了一个简陋版本的所谓“验证框架”能够得到众多网友的推荐。个人觉得这个验证框架有两个主要的特点是:提供CompositeValidator使...

2058
来自专栏皮皮之路

【JDK1.8】JUC——LockSupport

932
来自专栏haifeiWu与他朋友们的专栏

聊聊 JDK 非阻塞队列源码(CAS实现)

正如上篇文章聊聊 JDK 阻塞队列源码(ReentrantLock实现)所说,队列在我们现实生活中队列随处可见,最经典的就是去银行办理业务,超市买东西排队等。今...

1682
来自专栏Python疯子

Python selenium — 一定要会用selenium的等待,三种等待方式解读

很多人在群里问,这个下拉框定位不到、那个弹出框定位不到…各种定位不到,其实大多数情况下就是两种问题:1 有frame,2 没有加等待。殊不知,你的代码运行速度是...

1331
来自专栏Danny的专栏

探秘BOF 和EOF

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

1123
来自专栏杂烩

大话HelloWord是如何出现在面前的-浅谈虚拟机运行流程

    初学Java,大都会用记事本写个Hello Word,然后用CMD编译出class文件,最后执行一下。当控制台输出Hello Word的时候,一个简单的...

1522
来自专栏ImportSource

自己动手系列-延迟队列

1.什么是延迟队列 在java的并发包中有有关定时调度的api。 里边其中一个重要实现就是延迟队列,通过延时队列来实现定时调度。 那么如果让你实现一个延时队列...

1.2K12
来自专栏c#开发者

BizTalk Orchestration execute Flat file disassembler ReceivePipeline

需求是这样,在一个inbound XML中有个一点节点使用平文件的方式存放,还要解析比如固定长度或根据特殊字符截取字段 也就是需要在流程里面先把输入的XML的节...

36913
来自专栏无题

BlockingQueue源码分析

* ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。 LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。 ...

3987

扫码关注云+社区

领取腾讯云代金券