前言 在阅读完和 AQS 相关的锁以及同步辅助器之后,来一起阅读 JUC 下的和队列相关的源码。先从第一个开始:ArrayBlockingQueue。 公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!
由数组支持的有界BlockingQueue阻塞队列。
这个队列的命令元素FIFO(先入先出)。 队列的头是元素一直在队列中时间最长。 队列的尾部是该元素已经在队列中的时间最短。 新元素插入到队列的尾部,并且队列检索操作获取在队列的头部元素。
这是一个典型的“有界缓冲区”,在其中一个固定大小的数组保持由生产者插入并受到消费者的提取的元素。 一旦创建,容量不能改变。 试图put 一个元素到一个满的队列将导致操作阻塞; 试图 take 从空队列一个元素将类似地阻塞。
此类支持订购等待生产者和消费者线程可选的公平政策。 默认情况下,这个顺序不能保证。 然而,队列公平设置为构建 true 保证线程以FIFO的顺序进行访问。 公平性通常会降低吞吐量,但减少了可变性和避免饥饿。
public class ArrayBlockingQueueTest { private static final ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(10); private static final CountDownLatch LATCH = new CountDownLatch(2); public static void main(String[] args) { ExecutorService pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); pool.submit(() -> { for (int i = 0; i < 100; i++) { try { Thread.sleep(1000L); QUEUE.put("鸡蛋" + Thread.currentThread().getName()); System.out.println("put 放入元素"); } catch (InterruptedException ignored) { } } LATCH.countDown(); }); pool.submit(() -> { for (int i = 0; i < 100; i++) { try { Thread.sleep(500L); String take = QUEUE.take(); System.out.println("take = " + take); } catch (InterruptedException ignored) { } } LATCH.countDown(); }); try { LATCH.await(); } catch (InterruptedException ignored) { } pool.shutdown(); }}
demo 只是临时写的一个,很简单的版本。
/** 数组 - 存储队列中的元素 */final Object[] items;/** 下一个 take, poll, peek or remove 的索引 */int takeIndex;/** 下一个 put, offer, or add 的索引 */int putIndex;/** 队列中的元素数 */int count;/** Main lock guarding all access */final ReentrantLock lock;/** take 操作时是否等待 */private final Condition notEmpty;/** put 操作时是否等待 */private final Condition notFull;
public ArrayBlockingQueue(int capacity) { this(capacity, false);}// 指定容量,及是否公平public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}// 初始化的时候放入元素public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); }}
public boolean add(E e) { return super.add(e);}// 父类的方法,其实调用的也是 offerpublic boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");}// 使用锁public boolean offer(E e) { checkNotNull(e); // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); }}// 放入元素, 如果队列满了,则等待public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }}
所以下面还是需要看一下 enqueue 方法:
// 只有在获取锁的时候才可以调用private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // putIndex 下一个 put, offer, or add 的索引 // 对其进行赋值,然后进行 ++putIndex 操作 items[putIndex] = x; // 如果等于长度,则指定为开始 if (++putIndex == items.length) putIndex = 0; // 对元素数进行 ++ count++; // 有元素入队列,唤醒在等待获取元素的线程 notEmpty.signal();}
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); }}public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }}
通过源码可以看出:
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 获取元素并将元素置为 null E x = (E) items[takeIndex]; items[takeIndex] = null; // takeIndex 下一个 take, poll, peek or remove 的索引 // 指向下一个元素,并且 元素数减少 if (++takeIndex == items.length) takeIndex = 0; count--; // 更新迭代器状态 if (itrs != null) itrs.elementDequeued(); // 唤醒等待放入元素的线程 notFull.signal(); return x;}
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); }}
Q: ArrayBlockingQueue 的实现原理?
A: ArrayBlockingQueue 是基于数组实现的,内部使用 ReentrantLock 互斥锁,防止并发放置元素或者取出元素的冲突问题。
Q: 入队列和出队列方法之间的区别是什么?
ArrayBlockingQueue 中使用了 ReentrantLock 互斥锁,在元素入队列和出队列的时候都进行了加锁,所以同时只会有一个线程进行入队列或者出队列,从而保证线程安全。
领取专属 10元无门槛券
私享最新 技术干货