前面我们手写实现了一个单向队列,一个循环队列,今天我们再手写实现一个阻塞队列。顾名思义,阻塞队列就是在普通队列的基础上加了阻塞功能。本文是为了后面看ArrayBlockingQueue的源码做的前期准备。
阻塞队列顾名思义就是一个具有阻塞功能的队列,当队满时,对这个队列的入队操作就要被阻塞,当队空时,对这个队列的出队操作就要被阻塞。那么阻塞队列是如何实现阻塞的呢?阻塞队列的底层的数据结构又是啥呢?带着这两个问题我们开始今天的手写阻塞队列之旅。
通过前面几篇的学习,我们知道了可以通过数组或者链表来实现阻塞队列,用数组来实现的队列,查找和插入的时间复杂度是O(1),存储比较高效。所以在此处,我们也将通过数组来实现队列。此处直接定义了一个Object数组。
/**
* 存放数据的数组
*/
private volatile Object[] items;
阻塞队列里的阻塞功能,实际上就是采用生产者-消费者模型来实现的。首先我们在来回顾下生产者-消费者模型:生产者生产数据,当数据被生产之后就通知消费者过来消费;当容器中的数据满了时,生产者就会暂停生产数据,进入等待,同样的消费者从容器中取数据,当容器为空时,则消费者就会暂停消费,进入等待,当数据被消费之后则会通知生产者进行数据的生产。此处,生产者和消费者之间的通信是通过 synchronized
跟this.wait()
和this.notify()
,this.notifyAll()
搭配使用,或者是ReentrantLock
和Condition
的await()
以及signalAll()
和signal()
搭配使用。后面我们通过这两种方式分别来实现。首先我们通过synchronized(this)
跟this.wait()
和this.notify()
,this.notifyAll()
搭配使用的方式来实现。
/**
* 弹出的元素的下标
*/
private volatile int takeIndex;
/**
* 插入元素的下标
*/
private volatile int putIndex;
/**
* 数组中元素的大小
*/
private volatile int count;
如上,我们定义了三个变量。第一个变量是记录弹出的元素的下标的takeIndex,第二个变量是记录插入元素的下标的putIndex。为什么要分别用这两个整型变量来保存这样的位置呢,因为阻塞队列在使用的过程中会不断的被插入和弹出元素,所以可以认为元素在数组中像是贪吃蛇一样一步一步的向前移动的。每次弹出的都是队列的第一个元素,而插入的元素则会被添加到队尾,当下标到达末尾时会被设置为0。从数组的一个下标重新开始向后增长,形成一个不断的循环过程。这里的takeIndex可以类比前面几篇博文提到的head,putIndex可以类比前面几篇博文提到的tail。 如何判断队空和队满呢? 判断队空和队满就要用到count,count变量记录的是数组中元素的大小。当入队一个元素时count会加一,当出队一个元素时count会减一。队满时count == items.length
,队空时count==0
。说完了前面定义的元素之后,我们再来看看出队和入队操作。接着我们来看看队列的构造器,在构造器中主要就是实例化一个大小为capacity的数组。并且将 takeIndex ,putIndex和count的值都设置为0。
public SimpleBlockingQueue(int capacity) {
items = new Object[capacity];
takeIndex = putIndex = count = 0;
}
入队操作就是生产者生产数据。需要做好队满的判断。如下代码所示,enqueue方法用synchronized修饰,保证了同一时刻只能有一个线程向队列中插入元素。当count==items.length
时表示队列已满。则当前线程会调用wait()释放锁,进入等待队列。当队列未满时,则将元素插入到putIndex位置上。并且putIndex向后移动一位,如果已经到达了末尾则会返回队列开头。count会加1。然后,唤醒其他等待的线程进行消费。
public synchronized boolean enqueue(String element) throws InterruptedException {
//判断是否队满
try {
while (count == items.length) {
System.out.println("**********队列已满****");
this.wait();
}
System.out.println("*******"+Thread.currentThread().getName()+"在"+System.currentTimeMillis()+"时入队的元素是="+element);
//插入元素
items[putIndex] = element;
//putIndex向后移动一位,如果已经到达了末尾则会返回队列开头
if (++putIndex == items.length) {
putIndex = 0;
}
count++;
//唤醒所有休眠等待的线程
this.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}
出队操作就是消费者消费数据,需要做好队空的判断。出队操作如入队操作刚好相反。当count==0
时表示队列为空。当前线程进入等待队列,并且释放锁。然后取出takeIndex指向位置中的元素,并将该位置清空。然后takeIndex向后移动一位,如果已经到达了末尾则会返回队列开头。count会减1。
public synchronized Object dequeue() throws InterruptedException {
Object item = null;
//判断队列是否为空
try {
while (count == 0) {
System.out.println("***********队列为空********");
this.wait();
}
//取出takeIndex指向位置中的元素
item = items[takeIndex];
//并将该位置清空
items[takeIndex] = null;
//takeInde向后移动一位,如果已经到达了末尾则会返回队列开头
if (++takeIndex == items.length) {
takeIndex = 0;
}
count--;
//唤醒所有休眠等待的线程
this.notifyAll();
} catch (InterruptedException e) {
e.printStackTrace();
}
return item;
}
在此处为要用while判断队空队满呢?用if判断有啥问题么?经过分析,在调用this.wait()
之后,如果线程刚好被this.notifyAll()
唤醒,就会重新进行入队和出队操作。而不会再次检查count的值是否满足条件,如果一个生产者线程入队了一个元素,那么会调用this.notifyAll()
方法唤醒等待队列中的消费者线程,消费者线程会依次调用出队操作。那么在第一个消费者线程消费完元素之后,后面所有线程拿到的都将是null值。同时,当一个消费者线程消费完一个元素之后,同样也会调用this.notifyAll()
方法,这样即使队列中已经没有元素了,进入等待队列的消费者线程还是会被自己的同类所唤醒,消费根本不存在的元素,最终只能返回null值。所以为了解决这个问题,核心的就是在线程从this.wait()
中被唤醒时也仍然要重新检查一遍count的值是否满足要求。如果count不满足要求,那么当前线程仍然调用this.wait()
回到等待状态当中。我们可以使用一个while循环来包裹this.wait()
调用和对count的条件判断达到目的。
如下,创建了一个大小为4的阻塞队列,然后创建四个线程,两个生产者线程,两个消费者线程。每个线程执行10次,生产者线程向队列里并发放入数组0到9,消费者线程,从队列中弹出20次数字并打印弹出的数字。
public static void main(String[] args) throws InterruptedException {
//创建大小为4的阻塞队列
final SimpleSynchronizeBlockingQueue simpleBlockingQueue = new SimpleSynchronizeBlockingQueue(4);
//创建两个线程
final int threads = 2;
//每个线程执行10次
final int times = 10;
//线程列表
List<Thread> threadList = new ArrayList<>(threads * 2);
long startTime = System.currentTimeMillis();
//创建2个生产者线程,向队列中并发放入数字0到9,每个线程放入10个数字
for (int i = 0; i < threads; ++i) {
final int offset = i * times;
Thread producer = new Thread(() -> {
try {
for (int j = 0; j < times; ++j) {
simpleBlockingQueue.enqueue(String.valueOf(offset + j));
}
} catch (Exception e) {
e.printStackTrace();
}
});
threadList.add(producer);
producer.start();
}
// 创建2个消费者线程,从队列中弹出20次数字并打印弹出的数字
for (int i = 0; i < threads; ++i) {
Thread consumer = new Thread(() -> {
try {
for (int j = 0; j < times; ++j) {
String element =(String) simpleBlockingQueue.dequeue();
System.out.println(Thread.currentThread().getName()+"在"+System.currentTimeMillis()+"时"+"取出的元素="+element);
}
} catch (Exception e) {
e.printStackTrace();
}
});
threadList.add(consumer);
consumer.start();
}
// 等待所有线程执行完成
for (Thread thread : threadList) {
thread.join();
}
for (int i = 0; i < simpleBlockingQueue.items.length; i++) {
System.out.println("*********队列中剩余的元素="+simpleBlockingQueue.items[i]);
}
// 打印运行耗时
long endTime = System.currentTimeMillis();
System.out.println(String.format("总耗时:%.2fs", (endTime - startTime) / 1e3));
}
测试结果:
如上,看着使用synchronized的方案还挺好的,但是,还是不够安全,因为synchronized的锁对象是this对象。如果一个生产者一直调用了入队的方法,那么在其释放锁之前,其余线程是不能取数据的,整个队列就会进入阻塞状态。而且,使用synchronized(this)
、this.wait()
、this.notifyAll()
,这些同步机制都和当前对象this有关。因为synchronized (obj)可以使用任意对象对应的对象锁,而Object.wati()和Object.notifyAll()方法又都是public方法。也就是说不止在阻塞队列类内部可以使用这个阻塞队列对象的对象锁及其对应的条件变量。如果在外部代码中获取了阻塞队列对象上的对象锁和对应的条件变量,那么就有可能发生外部代码滥用阻塞队列对象上的对象锁导致阻塞队列性能下降甚至发生死锁的情况。那么有没有更加安全的方案呢?在JDK1.5之后提供了ReentrantLock 锁,该锁能够响应中断,支持重入。接下来,我们看看通过ReentrantLock和Condition的await()以及signalAll()和signal()的实现方案。
/**
* 并发锁
*/
private Lock blockLock = new ReentrantLock();
/**
* 队空的condition
*/
private Condition notEmpty = blockLock.newCondition();
/**
* 队满的condition
*/
private Condition notFull = blockLock.newCondition();
/**
* 入队操作(插入元素)
*
* @param element
* @return
*/
public boolean enqueue(String element) throws InterruptedException {
//判断是否队满
blockLock.lockInterruptibly();
try {
while (count == items.length) {
System.out.println("**********队列已满****");
notFull.await();
}
System.out.println("*******"+Thread.currentThread().getName()+"在"+System.currentTimeMillis()+"时入队的元素是="+element);
//插入元素
items[putIndex] = element;
//putIndex向后移动一位,如果已经到达了末尾则会返回队列开头
if (++putIndex == items.length) {
putIndex = 0;
}
count++;
notEmpty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//解锁
blockLock.unlock();
}
return true;
}
/**
* 出队操作(弹出元素)
*
* @return
*/
public Object dequeue() throws InterruptedException {
Object item = null;
//判断队列是否为空
blockLock.lockInterruptibly();
try {
while (count == 0) {
System.out.println("***********队列为空********");
notEmpty.await();
}
//取出takeIndex指向位置中的元素
item = items[takeIndex];
//并将该位置清空
items[takeIndex] = null;
//takeInde向后移动一位,如果已经到达了末尾则会返回队列开头
if (++takeIndex == items.length) {
takeIndex = 0;
}
count--;
notFull.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
blockLock.unlock();
}
return item;
}
如上代码:blockLock.lockInterruptibly();
是可以响应中断的获取锁,就是说如果线程发生了阻塞,可以给线程发送中断信号来唤醒它,从而使他有机会释放曾经持有的锁对象。其余代码与synchronized
的实现方式的相同,在此不再赘述,在JDK中ArrayBlockingQueue实现队列阻塞就是通过ReentrantLock和Condition的await()以及signalAll()和signal()的实现方案。测试方法同上,测试结果也同上。
本文我们手写了一个阻塞队列,队列的数据结构采用数组来实现。队列的阻塞功能分别用了synchronized(this)
、this.wait()
、this.notifyAll()
的方式和ReentrantLock
和Condition
的await()
以及signalAll()
和signal()
两种方式来实现阻塞功能,本质上就是生产者-消费者模型。。在JDK中ArrayBlockingQueue实现队列阻塞就是通过ReentrantLock
和Condition
的await()
以及signalAll()
和signal()
,因为这种方式效率更高,更加的安全。
从0到1实现自己的阻塞队列(上)[1] 动手写一个阻塞队列[2]
[1]
从0到1实现自己的阻塞队列(上): https://zhuanlan.zhihu.com/p/64156753
[2]
动手写一个阻塞队列: https://blog.csdn.net/jinjin603/article/details/81868993