前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >java并发队列之阻塞队列-ArrayBlockingQueue

java并发队列之阻塞队列-ArrayBlockingQueue

作者头像
胖虎
发布2019-06-26 17:28:49
8780
发布2019-06-26 17:28:49
举报
文章被收录于专栏:晏霖晏霖

前言

今天讲阻塞队列,阻塞队列有很多,这篇文章只讲解ArrayBlockingQueue,其他的大同小异。

正文

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列有哪些?

JDK7提供了7个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

简述一下其中比较重要的

ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

下面来探讨阻塞队列的实现原理,本文以ArrayBlockingQueue为例,其他阻塞队列实现原理可能和ArrayBlockingQueue有一些差别,但是大体思路应该类似,有兴趣的朋友可自行查看其他阻塞队列的实现源码。

首先来查看其构造函数

构造方法

描述

public ArrayBlockingQueue(int capacity)

构造指定大小的有界队列

public ArrayBlockingQueue(int capacity, boolean fair)

构造指定大小的有界队列,指定为公平或非公平锁

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合

看一下ArrayBlockingQueue类中的几个成员变量:

代码语言:javascript
复制
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 序列化ID。这个类依赖于默认的序列化

     *甚至对于默认序列化的项数组,即使

     *它是空的。否则,它不能被声明为.,即

     *这里需要。
     */
    private static final long serialVersionUID = -817911632652898426L;

    /** 排队的物品 */
    final Object[] items;

    /** 用于下一次获取、轮询、查看或删除的项目索引 */
    int takeIndex;

    /** 下一个put、.或add的项目索引 */
    int putIndex;

    /** 队列中的元素数量 */
    int count;

    /*
     * 并发控制使用经典的两条件算法

     *发现在任何教科书。
     */

    /** 主锁保护所有通道 */
    final ReentrantLock lock;

    /** 等待获取的条件 */
    private final Condition notEmpty;

    /** 等待放入的条件 */
    private final Condition notFull;

说明:可以看出,ArrayBlockingQueue中用来存储元素的实际上是一个数组,takeIndex和putIndex分别表示队首元素和队尾元素的下标,count表示队列中元素的个数。

lock是一个可重入锁,notEmpty和notFull是等待条件。

然后我们看一下里面重要的方法

方法\处理方式

抛出异常

返回特殊值

一直阻塞

超时退出

插入方法

add(e)队列未满时,返回true;队列满则抛出

offer(e)队列未满时,返回true;队列满时返回false。非阻塞立即返回。

put(e)队列未满时,直接插入没有返回值;队列满时会阻塞等待,一直等到队列未满时再插入。

offer(e,time,unit)设定等待的时间,如果在指定时间内还不能往队列中插入数据则返回false,插入成功返回true。

移除方法

remove()队列不为空时,返回队首值并移除;队列为空时抛出

poll()队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。

take()队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。

poll(time,unit)设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值

检查方法

element()

peek()

不可用

不可用

put()

首先是放入元素的方法,put(),该方法会阻塞,也是最常用写入队列的方法

从put方法的实现可以看出,它先获取了锁,并且获取的是可中断锁,然后判断当前元素个数是否等于数组的长度,如果相等,则调用notFull.await()进行等待,如果捕获到中断异常,则唤醒线程并抛出异常。

  当被其他线程唤醒时,通过enqueue(e)方法插入元素,最后解锁。

  enqueue它是一个private方法,插入成功后,通过notEmpty唤醒正在等待取元素的线程。

代码语言:javascript
复制
public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

take()

take方法是获取元素的方法,该方法在获取元素时是加锁,生产者无法操作queue,取到元素并移除元素,然后释放锁。

代码语言:javascript
复制
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

插入和取出的方法最常用的是put和take,因为只有这两个是阻塞的,其他的方法根据实际情况而定。

下面我简单的使用ArrayBlockingQueue写一个Demo,

简述:我使用单线程进行存入,多线程取出,

代码语言:javascript
复制
package com.aspire;

/**
 * 阻塞队列
 *
 * @author yanlin
 * @version v1.3
 * @date 2018-12-22 1:28 PM
 * @since v8.0
 **/

import org.testng.annotations.Test;

import java.util.concurrent.*;

public class BlockingQueueTest {
    @Test
    public void test() throws InterruptedException {
        ArrayBlockingQueue abq = new ArrayBlockingQueue(10, true);
        for (int i = 1; i <= 10; i++) {
            put(abq, i);
            get(abq);
        }
    }

    private void get(ArrayBlockingQueue abq) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        for (int i = 1; i <= 2; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Integer integer = null;
                    try {
                        integer = (Integer) abq.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (null == integer) {
                        System.out.println("队列是空,没有取到元素");
                    } else {
                        System.out.println("取到了一个元素是:" + integer + "---目前队列中元素个数是 :" + abq.size());
                    }
                    countDownLatch.countDown();
                }
            });
        }
        executorService.shutdown();
        countDownLatch.await();
    }

    /**
     * 添加元素
     *
     * @param abq
     * @param i
     */
    private void put(ArrayBlockingQueue abq, int i) throws InterruptedException {
        abq.put(i);
        System.out.println("存入了一个元素是   " + i);

    }
}

下面看输出,很明显多线程也没有发生并发问题,而且在这个图的下一个图可以得出一个结论,先入先出。

笔者临时想测一下,方法阻塞的样子是什么样的,当前线程一直在阻塞,个人觉得还是够浪费资源的,所以,这时可以使用poll的超时退出。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • 什么是阻塞队列
      • 阻塞队列有哪些?
        • 看一下ArrayBlockingQueue类中的几个成员变量:
          • 然后我们看一下里面重要的方法
            • put()
              • take()
              相关产品与服务
              容器服务
              腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档