首先来看一个例子,例子来源于网上:
/**
* 多线程模拟实现生产者/消费者模型
*
*/
public class BlockingQueueTest2 {
/**
*
* 定义装苹果的篮子
*
*/
public class Basket {
// 篮子,能够容纳3个苹果
BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
return basket.take();
}
}
// 定义苹果生产者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生产苹果
System.out.println("生产者准备生产苹果:" + instance);
basket.produce();
System.out.println("!生产者生产苹果完毕:" + instance);
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消费苹果
System.out.println("消费者准备消费苹果:" + instance);
System.out.println(basket.consume());
System.out.println("!消费者消费苹果完毕:" + instance);
// 休眠1000ms
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
public static void main(String[] args) {
BlockingQueueTest2 test = new BlockingQueueTest2();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = test.new Producer("生产者001", basket);
Producer producer2 = test.new Producer("生产者002", basket);
Consumer consumer = test.new Consumer("消费者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序运行5s后,所有任务停止
// try {
// Thread.sleep(1000 * 5);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// service.shutdownNow();
}
}
采用线程池和阻塞队列实现生产/消费者模型。其中LinkedBlockingQueue是阻塞队列,同时线程安全,其特点:
采用链表数据结构Node的方式进行节点数据的记录,
同时其进行入队和出队的计数器采用原子性的AtomicInteger
其出队和入队采用采用两把锁,putLock和takeLock,同时进行删除的时候,采用fullLock
其与LinkedBlockingQueue相比,其可以无界可以有界,而ArrayBlockingQueue是有界的,同时实现的数据结构不通过,一个采用数组、一个采用链表,同时采用的锁的方式不同,ArrayBlockingQueue采用一把锁,没有对生产和消费消息进行锁的分离。
1.相关变量
//容量,为空时使用Integer.MAX_VALUE=2^31-1
private final int capacity;
/** Current number of elements */
//计数,队列中的元素个数
private final AtomicInteger count = new AtomicInteger();
//头结点,head.item==null,首节点不存放元素
transient Node<E> head;
//尾节点,last.next==null
private transient Node<E> last;
/** Lock held by take, poll, etc */
//消费队列锁
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
//消费队列等待消费,用于队满时,进行消费
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
//生产队列锁
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
//生产队列等待生产,用于队空时,进行生产
private final Condition notFull = putLock.newCondition();
//节点信息:数据、后继点击
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
//下一个节点,分为三种情况:
// 指向真正的节点、指向自己,后继节点为head.next、为空,表示当前节点为尾节点
Node<E> next;
Node(E x) { item = x; }
}
2.构造方法
//构造方法,空参构造默认队列容量为2^31-1
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
//构造方法,带指定容量
public LinkedBlockingQueue(int capacity) {
//对容量进行校验
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//创建节点信息
last = head = new Node<E>(null);
}
//构造方法,放入带指定集合的元素信息入队
//首先采用默认大小,进行上锁操作,
// 放入元素到队列中,进行遍历,放入
public LinkedBlockingQueue(Collection<? extends E> c) {
//默认队列大小,2^31-1
this(Integer.MAX_VALUE);
//进行上锁操作
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
//放入元素,进行计数
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
//释放锁
putLock.unlock();
}
}
3.方法
生产方法
put操作
//入队操作
//首先获取锁,再检查队列是否满了,如果满了,则进行阻塞等待,
// 如果队列没有满,则进行生产操作,同时计数器进行计数
//生产后的元素个数如果还没有达到容量时,会继续唤醒其他生产线程
//当生产的元素是元素的第一个元素时唤醒阻塞等待消费的线程
public void put(E e) throws InterruptedException {
//非空校验
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
//设置计数为0,失败的时候返回
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//中断上锁
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//检查队列是否满了,满了进行阻塞操作
while (count.get() == capacity) {
notFull.await();
}
//入队操作,将节点信息插入到队尾
//last=last.next=node
enqueue(node);
c = count.getAndIncrement();
//元素没有满,则唤醒被阻塞的线程,增加线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//插入的是一个元素时唤醒阻塞等待的线程
if (c == 0)
signalNotEmpty();
}
offer操作
//阻塞带超时时间的offer操作
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//如果时间<0,则表示超时返回了,此时队列未满,直接返回
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//否者进行入队操作
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
//首先进行非空校验,如果队满了,直接返回false
//如果没有满,则进行上锁,同时进行判断,
// 如果计数<容量,则进行入队操作
//最后释放锁
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
消费者操作
take操作
//take操作 消费消息
//如果队列为非空或者被唤醒,进行消费操作,计数器-1
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//开始消费
if (c == capacity)
signalNotFull();
return x;
}
pull操作
//进行消费操作 poll,带超时时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
//进行poll操作
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
remove操作
//删除操作,释放指定节点信息
public boolean remove(Object o) {
if (o == null) return false;
//对生产消息和消费消息进行上锁
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
//释放节点
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
drainTo操作
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
//一次性地将队列中的全部元素消费完同时返回指定集合的信息,避免多次加锁造成的性能开销
//其中c和maxElement表示返回的集合、要获取的元素个数
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
//进行上锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//拿到两者之间的最小的一个
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
//将元素添加中集合c中
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}