前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发-BlockingQueue

Java并发-BlockingQueue

作者头像
lpe234
发布2021-03-02 15:34:34
5160
发布2021-03-02 15:34:34
举报
文章被收录于专栏:若是烟花若是烟花

阻塞队列长用于生产者消费者场景,生产者添加元素,消费者获取元素。BlockingQueue是存放元素的容器,它提供了线程安全的队列访问方式,JUC下面很多高级同步类都是基于它实现的。

1 概述

阻塞队列(BlockingQueue)是一个支持两种附加操作的队列。支持附加阻塞的插入和移除操作。

  • 支持阻塞的插入:当队列满时,插入操作会被阻塞,直到队列不满。
  • 支持阻塞的移除:当队列空时,移除操作会被阻塞,直到队列不空。

阻塞队列不可用时,操作处理方式

方法\处理方式

抛出异常

返回特殊值

一直阻塞

超时退出

插入方法

add(e)

offer(e)

put(e)

offer(e, time, unit)

移除方法

remove()

poll()

take()

poll(time, unit)

检查方法

element()

peek()

  • 抛出异常:队列满时,若继续插入元素会抛出IllegalStateException;当队列为空时,若获取元素则会抛出NoSuchElementException异常。
  • 返回特殊值:向队列插入元素时,会返回是否插入成功true/false;获取元素时,成功则返回元素,失败则返回null。
  • 一直阻塞:当阻塞队列满时,若继续使用put新增元素时会被阻塞,直到队列不为空或者响应中断退出;当阻塞队列为空时,继续使用take获取元素时会被阻塞,直到队列不为空。
  • 超时退出:当阻塞队列满时,使用offer(e, time, unit)新增元素会被阻塞至超时退出;当队列为空时,使用poll(time, unit)获取元素时会被阻塞至超时退出。

注意:

  • 阻塞队列中不允许插入null,会抛出NPE异常。
  • 可以访问阻塞队列中的任意元素,调用remove(Object o)可以将队列之中的特定对象移除,但会遍历全部元素,并不高效。

2 阻塞队列的实现

2.1 ArrayBlockingQueue

由数组构成的有界阻塞队列,内部由数组final Object[] items实现。默认情况下不保证线程公平的访问队列,所谓公平访问队列指阻塞的线程,可以按照阻塞的先后顺序访问队列。

代码语言:javascript
复制
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);  // 使用公平锁/非公平锁
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}

队列大小初始化后不可修改。参数fair控制内部ReentrantLock是否采用公平锁。

2.2 LinkedBlockingQueue

链表实现的有界阻塞队列。内部结构是单链表。默认大小为Integer.MAX_VALUE,可以指定大小。

代码语言:javascript
复制
public LinkedBlockingQueue(int capacity) {
  if (capacity <= 0) throw new IllegalArgumentException();
  // 指定队列大小
  this.capacity = capacity;
  last = head = new Node<E>(null);
}

// 单链表节点Node
static class Node<E> {
  E item;
  Node<E> next;
  Node(E x) { item = x; }
}
2.3 PriorityBlockingQueue

支持优先级的无界阻塞队列。默认情况下采取自然顺序升序排列。也可以自定义compareTo()方法来指定元素的排列顺序,或者初始化队列时,指定构造参数Comparator来对元素进行排序。同优先级顺序无法保证。

代码语言:javascript
复制
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();  // 非公平锁
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}


// offer方法部分代码
Comparator<? super E> cmp = comparator;
if (cmp == null)
  siftUpComparable(n, e, array);
else
  siftUpUsingComparator(n, e, array, cmp);

由offer代码可以看出,Comparator的优先级是大于Comparable.compareTo方法的。

注意:PriorityBlockingQueue不会阻塞数据生产者(队列无界),只会在没有数据时阻塞消费者。生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则将有可能耗尽堆空间。

2.4 DelayQueue

支持延时获取元素的无界队列。队列使用PriorityQueue实现。队列中的元素必须实现java.util.concurrent.Delayed接口,在创建元素时指定多久才能才能从队列中取到元素。

DelayQueue非常有用,可以将DelayQueu应用在以下应用场景。

  • 缓存系统的设计:用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能获取到元素时,表示缓存有限期到了。
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。比如TimerQueue就是使用DelayQueue实现的。
2.5 SynchronousQueue

不存储元素的阻塞队列。每个put操作都必须等待一个take操作,反之亦然。

代码语言:javascript
复制
// fair为true,等待线程将以FIFO的顺序进行访问
public SynchronousQueue(boolean fair) {
  transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

将生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于ArrayBlockingQueueLinkedBlockingQueue

3 阻塞队列的原理

利用Lock锁的多条件(Condition)阻塞控制。下面简单分析下ArrayBlockingQueue部分代码。

3.1 ArrayBlockingQueue属性
代码语言:javascript
复制
/** The queued items */
// 数据元素数组
final Object[] items;

/** items index for next take, poll, peek or remove */
// 下一个待获取元素索引
int takeIndex;

/** items index for next put, offer, or add */
// 下一个待插入元素索引
int putIndex;

/** Number of elements in the queue */
// 队列中元素个数
int count;

/*
 * Concurrency control uses the classic two-condition algorithm
 * found in any textbook.
 */

/** Main lock guarding all access */
// 所有访问的主锁
final ReentrantLock lock;

/** Condition for waiting takes */
// 消费者监视器
private final Condition notEmpty;

/** Condition for waiting puts */
// 生产者监视器
private final Condition notFull;


// 
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity];
  lock = new ReentrantLock(fair);
  notEmpty = lock.newCondition();
  notFull =  lock.newCondition();
}
3.2 put操作
代码语言:javascript
复制
// 在队列尾部插入元素,若队列已满则等待队列非满。
public void put(E e) throws InterruptedException {
  // 校验插入元素,为空则抛出NPE
  checkNotNull(e);
  final ReentrantLock lock = this.lock;
  // 1. 尝试获取锁(响应中断)
  lock.lockInterruptibly();
  try {
    // 2. 当队列满时
    while (count == items.length)
      // 2.1 若队列满,则阻塞当前线程。等待`notFull.signal()`唤醒。
      notFull.await();
    // 3. 非满则执行入队操作
    enqueue(e);
  } finally {
    lock.unlock();
  }
}

// 在`putIndex`处放置当前元素,只有获取lock锁后才会调用
private void enqueue(E x) {
  // assert lock.getHoldCount() == 1;
  // assert items[putIndex] == null;
  final Object[] items = this.items;
  // 在`putIndex`处放置元素
  items[putIndex] = x;
  // putIndex等于数组长度时,重置为0索引。
  if (++putIndex == items.length)
    putIndex = 0;
  // 数量加1
  count++;
  // 4. 唤醒一个等待线程(等待取元素的线程)
  notEmpty.signal();
}

put总体流程:

  1. 获取lock锁,拿到锁后继续执行,否则自旋竞争锁。
  2. 判断阻塞队列是否满。满了了则调用await阻塞当前线程。同时释放lock锁。
  3. 如果没满,则调用enqueue方法将元素put进阻塞队列。此时还有一种可能是:第2步中被阻塞的线程被唤醒且又拿到了lock锁。
  4. 唤醒一个标记为notEmpty(消费者)的线程。
3.3 take操作
代码语言:javascript
复制
// 从头部获取元素,若队列为空则等待队列非空。
public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 1. 获取锁
  lock.lockInterruptibly();
  try {
    // 2. 当队列为空时
    while (count == 0)
      // 2.1 当队列为空时,阻塞当前线程。等待`notEmpty.signal()`唤醒。
      notEmpty.await();
    // 3. 非空则进行入队操作
    return dequeue();
  } finally {
    lock.unlock();
  }
}

// 从`takeIndex`位置获取当前元素,只有获取到lock锁后才会调用
private E dequeue() {
  // assert lock.getHoldCount() == 1;
  // assert items[takeIndex] != null;
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  // 从`takeIndex`位置获取元素,然后清除该位置元素
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  // 
  if (++takeIndex == items.length)
    takeIndex = 0;
  // 队列元素减1
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  // 4. 唤醒一个标记为notFull(生产者)的线程
  notFull.signal();
  return x;
}

take的整体流程:

  1. 获取lock锁,拿到锁则执行下一步流程;未拿到则自旋竞争锁。
  2. 当前队列是否为空,若为空则调用notEmpty.await阻塞当前线程,同时释放锁,等待被唤醒。
  3. 若非空,则调用dequeue进行出队操作。此时还有一种可能:第2步中的阻塞的线程被唤醒并且又拿到了lock锁。
  4. 唤醒一个被标记为notFull(生产者)的线程。
3.4 总结
  1. puttake操作都需要先获得锁,没有获得锁的线程无法进行操作。
  2. 拿到锁后,并不一定能顺利执行put/take操作,还需要判断队列是否可用(是否满/空),不可用则会被阻塞,并释放锁。
  3. 在2中被阻塞的线程会被唤醒,但唤醒之后依然需要拿到锁之后才能继续向下执行。否则,自旋拿锁,拿到锁后再while判断队列是否可用。

参考资料:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 概述
  • 2 阻塞队列的实现
    • 2.1 ArrayBlockingQueue
      • 2.2 LinkedBlockingQueue
        • 2.3 PriorityBlockingQueue
          • 2.4 DelayQueue
            • 2.5 SynchronousQueue
            • 3 阻塞队列的原理
              • 3.1 ArrayBlockingQueue属性
                • 3.2 put操作
                  • 3.3 take操作
                    • 3.4 总结
                    相关产品与服务
                    容器服务
                    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档