首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

并发编程之ArrayBlockingQueue

这两天花了几个小时来看ArrayBlockingQueue,阻塞队列。其实它的实现思想是比较简单的,主要是利用ReentrantLock和Condition来实现。首先理解什么是阻塞队列:

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:

在队列为空时,获取元素的线程会等待队列变为非空。

当队列满时,存储元素的线程会等待队列可用。

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

也就是说,阻塞队列,这里以ArrayBlockQueue来说明,用生产者消费者模型来理解再好不过了。当队列里有东西,你可以进行取操作,没有东西时候,你就需要阻塞起来,等有东西再取。

What is ArrayBlockingQueue

首先从名字里面可以看出来,ArrayBlockingQueue里面主要的数据结构就是一个数组,使用ReentrantLock对其进行加锁,使用Condition实现存取等待。

接下来看里面定义:

/**

* FIFO特性。

* head元素是队列里面存在最久的元素。

* 非空

* 一旦创建,大小就不能改变。

* 如果队列满了,再想入対就会阻塞

* 支持公平锁策略。默认是非公平的。

*/

public class ArrayBlockingQueueAnlysis extends AbstractQueue

implements BlockingQueue, java.io.Serializable {

final Object[] items; //用来存items

int takeIndex; //出队,获取的下一个index。

int putIndex; //add或者put的下一个index

int count; //元素的个数

final ReentrantLock lock; //主要的一把锁

private final Condition notEmpty; //空的condition

private final Condition notFull; //满的condition

transient Itrs itrs = null; //用于以链表方式存储所有已经创建的iterator。

...

}

上述代码中,大体都有注释,我觉得比较值得思考的有普通的两个index,两个Condition,当然还有一个

itrs这个变量,感觉是个比较新的思想,下文会聊。

add操作

由add操作:

public boolean add(E e) {

return super.add(e);

}

最终父类的add会调用子类具体实现的offer方法:

public boolean offer(E e) {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lock(); //加锁。

try {

if (count == items.length)

return false;

else {

enqueue(e); //入队。

return true;

}

} finally {

lock.unlock();

}

}

在offer方法里面,会用lock进行加锁,也就是一次只能一个线程对queue进行操作,最终,会调用enqueue方法:

private void enqueue(E x) {

final Object[] items = this.items;

items[putIndex] = x;

if (++putIndex == items.length)

putIndex = 0;

count++;

notEmpty.signal();

}

插入的就是简单的利用putIndex,在数组里面新增加一个元素,然后调用notEmpty.signal();

跟所有在等待取数的线程说,喂,有东西了,来拿吧!

从上文add相关代码来看,还是比较好理解的,存储结构是数组,逻辑结构是队列,利用ReentrantLock进行加锁。

使用Condition实现等待/通知模式

put操作

对于add,还有一个类似的put操作:

public void put(E e) throws InterruptedException {

checkNotNull(e);

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == items.length)

notFull.await(); //如果满了,调用notFull这个condition加入队列睡眠等待。

enqueue(e);

} finally {

lock.unlock();

}

}

实现逻辑基本一致,注意一旦队列满了,需要调用 notFull.await() 等待。

take操作

现在看看出队的take操作:

public E take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

while (count == 0)

notEmpty.await();

return dequeue();

} finally {

lock.unlock();

}

}

同样如果count==0,则需要等待。接下来看dequeue方法:

private E dequeue() {

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x = (E) items[takeIndex];

items[takeIndex] = null; //当前位置置null,有利于回收。

if (++takeIndex == items.length)

takeIndex = 0;

count--;

if (itrs != null)

itrs.elementDequeued(); //把itrs里面也出队一个。

notFull.signal();

return x;

}

同样是针对数组的简单出队,从队尾出。会调用一次itrs.elementDequeued(); 下文提到。

其他方法

里面还有一些其他方法例如:

remainingCapacity() : 返回数组的甚于容量

remove(Object o) :删除某一个特定值为o的元素

size() :返回该元素的数量

spliterator() :返回此元素的分割迭代器

基本都是针对数组的操作,利用ReentrantLock和Condition进行控制。

下面主要看它的迭代器。

Itr

感觉ArrayBlockingQueue里面的迭代器非常有特色,不同于普通的集合类里面那么简单,从阅读类的源码来看,它是线程安全的,不会抛出ConcurrentModificationException 。另一方面,ArrayBlockingQueue里面数组下标是循环利用的,可以理解为是条循环队列。

所以,所有的迭代器共享数据,队列改变会影响所有的迭代器。为了保证正确,增加了许多复杂的操作,但是由于循环数组和一些内部移除会导致迭代器丢失它们的位置,或显示一些它们不应该显示的元素。

比如,迭代器在创建的时候,其位置已经确定,但是队列可能在不断的出入队列,这样迭代器会受到严重影响,可能造成队列实际上入出循环了数组一圈,而迭代器记录的是上一圈的情况,只有下标,这样遍历就会造成很大的问题。

为了避免这个情况,同时也为了保证操作的正确性,当队列有一个或多个迭代器的时候,其通过以下手段保持状态:

跟踪循环的次数。即 takeIndex为0的次数。

每当删除一个内部元素时,通过回调通知所有迭代器(因此其他元素也可以移动)。

下面就是它的主要字段:

cursor:主要指向下一个元素

nextItem:指向下一个元素

nextIndex:nextItem的index

lastItem:最后一个元素

lastRet:最后一个元素的索引

prevTakeIndex:takeIndex的前一个位置

prevCycles:itrs监控前一个的循环数量cycles的值

NONE = -1:none模式,代表节点不存在或者没有

REMOVED = -2:说明当前节点被其他线程调用remove模式删除了

DETACHED = -3:说明处于detached模式

接下来看它的构造方法:

Itr() {

lastRet = NONE; //最后一个索引为NONE

final ReentrantLock lock = ArrayBlockingQueue.this.lock; //获取外部类的锁。

lock.lock(); //加锁

try {

if (count == 0) { //当队列里面实际是没有数据的

cursor = NONE;

nextIndex = NONE;

prevTakeIndex = DETACHED;

} else {

final int takeIndex = ArrayBlockingQueue.this.takeIndex;

prevTakeIndex = takeIndex;

nextItem = itemAt(nextIndex = takeIndex);

cursor = incCursor(takeIndex);

if (itrs == null) {

itrs = new Itrs(this);

} else {

itrs.register(this); // in this order

itrs.doSomeSweeping(false); //清理无用的迭代器

}

prevCycles = itrs.cycles;

}

} finally {

lock.unlock();

}

}

上面构造方法是什么意思呢?

count等于0的时候,就说明队列里面没有数据,那么创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。

而doSomeSweeping主要用来清理无用的迭代器。在迭代器创建和detach的时候会触发。sweeper字段就是记录上次扫描到的位置。如果为null,就从链表头开始扫描,有就从其下一个开始扫描。如果找到了一个被回收了或者是耗尽的迭代器,就清理掉它,继续找下一个。这就完成了对无效迭代器的清理了。下面看看它的主要代码:

void doSomeSweeping(boolean tryHarder) {

int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; //判断要尝试几次去清扫。

Node o, p;

final Node sweeper = this.sweeper;

boolean passedGo; // to limit search to one full sweep

if (sweeper == null) { //初始化o,p,以及passedGo

o = null;

p = head;

passedGo = true;

} else {

o = sweeper;

p = o.next;

passedGo = false;

}

for (; probes > 0; probes--) { //循环次数。

if (p == null) {

if (passedGo)

break;

o = null;

p = head;

passedGo = true;

}

final Itr it = p.get();

final Node next = p.next;

if (it == null || it.isDetached()) { //这个iterator是null,或者已经处于detached模式了。

// found a discarded/exhausted iterator

probes = LONG_SWEEP_PROBES; // "try harder"

// unlink p

p.clear();

p.next = null;

if (o == null) { //说明是第一个迭代器

head = next;

if (next == null) { //itrs里面是空的了。

// We've run out of iterators to track; retire

itrs = null;

return;

}

}

else

o.next = next; //o指向前一个清扫过的p

} else {

o = p; //把p赋值给o,

}

p = next; //p往后面串一个。

}

this.sweeper = (p == null) ? null : o; //判断p,并给sweeper赋值。

}

下面主要看负责管理Iterator的Itrs类。

Itrs

先看结构:

class Itrs {

private class Node extends WeakReference {

Node next; //指向下一个节点

Node(Itr iterator, Node next) {

super(iterator);

this.next = next;

}

}

//

int cycles = 0;

//头节点head。

private Node head;

//用来去删除废弃的iterators。

private Node sweeper = null;

//尝试次数

private static final int SHORT_SWEEP_PROBES = 4;

private static final int LONG_SWEEP_PROBES = 16;

里面每个Iterator被一个Node节点封装,而每个Node又是一个弱引用(WeakReference),具体关于Java各种引用可看:Java中强引用、软引用、弱引用、虚引用

上文的add操作并没有调用itrs的相关操作。

在remove方法里面有调用,这里具体分析下:

而在void removeAt(final int removeIndex),删除特定位置的元素方法里面调用了itrs.elementDequeued(); 接下来看elementDequeued 方法:

void elementDequeued() {

if (count == 0)

queueIsEmpty();

else if (takeIndex == 0)

takeIndexWrapped();

}

当count为0时候,调用queueIsEmpty:

void queueIsEmpty() {

for (Node p = head; p != null; p = p.next) {

Itr it = p.get();

if (it != null) {

p.clear();

it.shutdown();

}

}

head = null;

itrs = null;

}

而在queueIsEmpty 里面,则需要把itrs里面的所有node检查以便,如果此时里面的某一个iterator不为null,调用shutdown方法,shutdown方法里面则是把Iterator里面的状态标志初始化:

void shutdown() {

cursor = NONE;

if (nextIndex >= 0)

nextIndex = REMOVED;

if (lastRet >= 0) {

lastRet = REMOVED;

lastItem = null;

}

prevTakeIndex = DETACHED;

}

而在elementDequeued 里面的第二个条件中,从外部类的takeIndex 判断是否为0,从而判断是否能够拿东西(或者循环了一圈回到原点),如果不能拿,则调用takeIndexWrapped 方法:

boolean takeIndexWrapped() {

// assert lock.getHoldCount() == 1;

if (isDetached())

return true;

if (itrs.cycles - prevCycles > 1) { //

shutdown();

return true;

}

return false;

}

isDetached方法就是判断takeIndex的前一个元素是不是小于0,即takeIndex是不是为0。

所以对于remove方法里面,itrs做的主要事情如下:

队列中数量为0的时候,队列就是空的,会将所有迭代器进行清理并移除。

否者如果takeIndex的下标是0,意味着队列从尾中取完了,又回到头部获取

心得

总的来说,就增加修改逻辑来说,ArrayBlockingQueue并不难理解,主要逻辑就是ReentrantLock+Condition+数组,而里面比较相对于其他有特点的就是Iterator的实现,以及利用Itrs对Iterator进行管理的过程。

append

里面变量并没有用volatile来保证诸如count,putIndex,takeIndex的可见性。

另一方面,由于是加锁的阻塞队列,所以性能上是有缺陷的,但是功能上确实很好的,生产者消费者模型。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190202G04PU900?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券