前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >BlockingQueue 源码分析

BlockingQueue 源码分析

作者头像
晚上没宵夜
发布2022-05-09 21:17:56
2270
发布2022-05-09 21:17:56
举报

学习多线程定时器时遇到 BlockingQueue 阻塞队列,当时的认识仅限于了解其是一个并发阻塞队列,不知如何使用及其原理

1. 介绍

BlockingQueue 首先是一个队列,其次提供了阻塞功能。它看起来很像消息队列可让消息解耦,但其在生产者-消费者模型中通过阻塞又可使二者速度达到平衡。使用阻塞队列无需过多考虑线程安全问题,专注业务逻辑的实现即可

BlockingQueue 有正常的队列功能,即出队与入队。其阻塞体现在:

  • 当队列满时,若有生产者继续往队列添加元素,则阻塞这个生产者线程
  • 当队列为空,若有消费者继续从队列移除元素,则阻塞这个消费者线程
  • 添加元素时,则队列不为空,则唤醒消费者线程
  • 移除元素时,则队列不为满,则唤醒生产者线程

常见的阻塞队列:

  • ArrayBlockingQueue:基于数组的有界阻塞队列
  • LinkedBlockingQueue:基于链表的有界阻塞队列
  • PriorityBlockingQueue:基于堆的优先级无界阻塞队列
  • DelayQueue:基于时间优先级的无界阻塞队列

后面将介绍 ArrayBlockingQueue、LinkedBlockingQueue 两个阻塞队列, 其类继承图如下:

2. Queue 接口

Queue 接口具有队列的基本方法,其不同之处在于同一个功能他有两套方法,两套方法区别于一套是实现返回值,另一套是抛出异常

Throw Exception

Return value

增加

add()

offer()

删除

remove()

poll()

检查

element()

peek()

3. BlockingQueue 接口

BlockingQueue 接口在 Queue 的接口上添加多几个方法或重载,最常用的方法有 put 和 take(有阻塞功能)

非阻塞

阻塞线程方法

增加

offer(E e, long timeout, TimeUnit unit):等待超时后返回值

put()

删除

poll(long timeout, TimeUnit unit):等待超时后返回值

take()

包含

contains(Object o)

剩余大小

remainingCapacity()

转移元素

drainTo(Collection<? super E> c)

4. AbstractQueue 抽象类

通过 AbstractQueue 抽象类可知道 Queue 接口的两套方法其实本质是一样的,只不过多了一层抛异常的判断而已

代码语言:javascript
复制
public abstract class AbstractQueue<E> extends AbstractCollection<E> implements Queue<E> {

    /**
     * 构造器
     */
    protected AbstractQueue() {
    }


    /**
     * add 调用的还是 offer 方法返回值
     */
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    
    /**
     * remove 调用的还是 poll 方法返回值
     */
    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    
    /**
     * remove 调用的还是 poll 方法返回值
     */
    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }


    /**
     * 清空元素
     */
    public void clear() {
        while (poll() != null)
            ;
    }

    
    /**
     * 批量添加元素
     */
    public boolean addAll(Collection<? extends E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }
}

5. ArrayBlockingQueue

ArrayBlockingQueue 使用循环数组来存储元素,其大小是确定的,所以属于有界阻塞队列。阻塞功能由可重入锁实现,出队入队都靠同一个锁来控制,并且由条件锁来实现生产者和消费者线程的阻塞和唤醒。

5.1 内部属性

代码语言:javascript
复制
// 存储元素的数组
final Object[] items;

// 可获取元素的下标
int takeIndex;

// 可存放元素的下标
int putIndex;

// 元素总数量
int count;

// 可重入锁
final ReentrantLock lock;

// 非空条件锁,队列为空时阻塞消费者线程
private final Condition notEmpty;

// 非满条件锁,队列满时阻塞生产者线程
private final Condition notFull;

// 自定义的迭代器(可使用默认提供的)
transient Itrs itrs = null;

5.2 关键方法

代码语言:javascript
复制
// 入队,添加元素后队列则不为空,会唤醒被阻塞的消费者线程
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
代码语言:javascript
复制
// 出队,移除元素后队列则不满,唤醒被阻塞的生产者线程
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

5.3 常用方法

put、take 是最常用的方法,对应添加与移除元素

添加元素时,锁住整个数组。若队列满了则阻塞当前添加元素的线程,否则添加完元素后唤醒消费者线程进行消费

代码语言:javascript
复制
// 添加元素,
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    
    // 获取锁
    lock.lockInterruptibly();
    try {
        
        // 若队满,则阻塞生产者线程
        while (count == items.length)
            notFull.await();
        
        // 入队
        enqueue(e);
    } finally {
        
        // 释放锁
        lock.unlock();
    }
}

移除元素时,锁住整个数组。若队列空了则阻塞当前移除元素的线程,否则移除完元素后唤醒生产者线程进行生产

代码语言:javascript
复制
// 移除元素
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    
    // 获取锁
    lock.lockInterruptibly();
    try {
        
        // 队空则阻塞消费者线程
        while (count == 0)
            notEmpty.await();
        
        // 出队
        return dequeue();
    } finally {
        
        // 释放锁
        lock.unlock();
    }
}

5.4 使用示例

下面来模拟一个生产-消费者场景。发现刚开始生产者一直生产商品,到队满后则被阻塞,此后消费者消费了一个商品后,生产者才生产一个商品

代码语言:javascript
复制
public class BlockingQueueTest {
    public static void main(String[] args) {

        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(5);
        
        // 生产者线程,每秒生产一个商品
        new Thread(()->{
            while (true) {
                try {
                    Thread.sleep(1000);
                    System.out.println("生产者生产了商品");
                    blockingQueue.put(new Integer(1));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        // 消费者线程,每5秒消费一个商品
        new Thread(()->{
            while (true) {
                try {
                    Thread.sleep(5000);
                    System.out.println("消费者消费了商品");
                    blockingQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

6. LinkedBlockingQueue

LinkedBlockingQueue 底层是一个单向链表,可指定链表大小(不指定默认为 Integer.MAX_VALUE),实现原理和 ArrayBlockingQueue 类似,不同在于其链表的结构可实现同时出队和入队,此时需要两把锁来控制并发,所以其出队入队是不影响的

下面的实现类是 删除简化很多代码之后的结果 ,为了简洁的展示实现原理

代码语言:javascript
复制
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
  
    /**
     * 单向链表的节点
     */
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) {
        	item = x;
        }
    }

    /** 最大容量 */
    private final int capacity;

    /** 现有元素总数量,原子操作 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * 链表头
     */
    transient Node<E> head;

    /**
     * 链表尾
     */
    private transient Node<E> last;

    /** take方法的可重入锁,控制并发 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** take方法的条件锁 */
    private final Condition notEmpty = takeLock.newCondition();

    /** put方法的可重入锁,控制并发 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** put方法的条件锁 */
    private final Condition notFull = putLock.newCondition();

    /**
     * 唤醒非空阻塞的线程
     */
    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 唤醒非满阻塞的线程
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }

    /**
     * 链表尾入队
     */
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

    /**
     * 链表头出队
     */
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    
	/**
	 * 入队
	 */
	public void put(E e) throws InterruptedException {
		if (e == null) throw new NullPointerException();
       
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
         
        // 获取 put 方法锁,锁住链表尾部
        putLock.lockInterruptibly();
        try {
           
            // 队满时,阻塞添加元素的线程
            while (count.get() == capacity) {
                notFull.await();
            }
            
            // 入队操作
            enqueue(node);
            c = count.getAndIncrement();
            
            // 添加元素后队列还没满,唤醒非满阻塞的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        
        // 说明队列为空了,唤醒被阻塞的生产者线程
        if (c == 0)
            signalNotEmpty();
    }
    
    
    /**
	 * 出队
	 */
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        
        // 获取 take 方法锁,锁住链表头部
        takeLock.lockInterruptibly();
        try {
            
            // 队空时,阻塞获取元素的线程
            while (count.get() == 0) {
                notEmpty.await();
            }
            
            // 出队
            x = dequeue();
            c = count.getAndDecrement();
            
            // 出队后,队列不为空,继续唤醒出队线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        
		// 队满了,唤醒被阻塞的消费者线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
 }
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 介绍
  • 2. Queue 接口
  • 3. BlockingQueue 接口
  • 4. AbstractQueue 抽象类
  • 5. ArrayBlockingQueue
    • 5.1 内部属性
      • 5.2 关键方法
        • 5.3 常用方法
          • 5.4 使用示例
          • 6. LinkedBlockingQueue
          相关产品与服务
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档