生产者-消费者模式是一个比较经典的问题。
该模式有这么一些特点:
基于特点考虑,考核多线程相关的知识、线程间通信的知识。
回顾:线程间通信的知识点。
关于线程间通信的知识点,以下列出部分和生产者-消费者模式相关的一些。
二者都是Object类的方法,使用这套方法时必须获得同步锁synchronized。
接口java.util.concurrent.BlockingQueue
天然具有阻塞、线程安全的特性,所以可以直接使用其实现类实现生产-消费者模式。
抛异常 | 返回指定值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e):成功true;失败异常 | offer(e):满了则返回false | put(e):满了会等待阻塞 | off(e, time, unit):设置超时 |
移除 | remove():不存在会空指针异常 | poll():空了返回null | take():空了会等待阻塞 | poll(time, unit):设置超时 |
判断 | element():对列空抛异常 | peek() |
其常见实现类有这么一些:
可以利用put、take这一对方法实现生产-消费者模式。
在jdk并发包里面还提供了一个条件接口java.util.concurrent.locks.Condition
——条件信号类。
可以理解为这是一个锁的条件:通过一个锁的多个条件可以共享状态信息。所以对于同一个锁对象可以创建多个条件。
使用其实例时,建议使用 new Condition()
方法。
不同于synchronized是JVM底层的实现,而Lock是java语言级别的实现控制对象锁资源。
我们可以使用java.util.concurrent.locks.Lock
替换synchronized
方法和语句的使用, Condition
取代了对象监视器方法的使用。
ReentrantLock lock = new ReentrantLock(); // 锁
Condition notEmpty = lock.newCondition(); // 条件一
Condition notFull = lock.newCondition(); // 条件二
ReentrantLock
Condition
使用形式一般如下(Condition的javadoc提供的一个示例代码),里面的注释给出了一个可能按以下顺序执行的场景:
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100]; // 作为缓冲队列
int putptr, takeptr, count;
// 生产
public void put(Object x) throws InterruptedException {
lock.lock(); // 0. 获得锁
try {
while (count == items.length) // 1.队列已满
notFull.await(); // 2.则notFull条件等待--当前线程暂停,会释放锁 ; // 7. 重新获得锁,队列不满则退出循环
items[putptr] = x; // 8. 生产数据
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 9. 唤醒其他线程--notEmpty条件等待的线程
} finally {
lock.unlock(); // 10. 释放锁,被唤醒的其他线程开始有机会重新获得锁了
}
}
// 消费
public Object take() throws InterruptedException {
lock.lock(); // 3.获得锁
try {
while (count == 0)
notEmpty.await(); // 队列空,则notEmpty等待,释放锁; // 重新获得锁之后,如果有数据则退出循环;还是没有数据则继续等待释放锁
Object x = items[takeptr]; // 4. 消费一条数据
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 5. notFull条件唤醒其他线程--此时notFull等待的线程
return x;
} finally {
lock.unlock(); // 6.释放锁;被唤醒的其他线程开始有机会重新获得锁了
}
}
我们可以利用前面的线程间通信的方式实现生产者-消费者模式。
直接使用阻塞队列BlockingQueue自带的put、take方法实现。
/**
* 生产者-消费者实现模式一——使用内置的阻塞队列
* BlockingQueue的put、take天然支持阻塞等待、线程安全
*/
public class _01_Producer implements Runnable{
private BlockingQueue<Object> blockingQueue;
public _01_Producer(BlockingQueue<Object> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for(int i=1; i<=1000; i++){
try {
blockingQueue.put(Thread.currentThread().getName()+"-" + i);
System.out.println("生产者线程[" + Thread.currentThread().getName()
+ "]生产了" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class _01_Consumer implements Runnable {
private BlockingQueue blockingQueue;
public _01_Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for(int i=1; i<=1000; i++){
try {
System.out.println("消费者[" + Thread.currentThread().getName()
+ "]消费了:" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class _01_ProducerConsumerTest {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue<>(100);
new Thread(new _01_Producer(blockingQueue)).start();
new Thread(new _01_Consumer(blockingQueue)).start();
new Thread(new _01_Producer(blockingQueue)).start();
new Thread(new _01_Producer(blockingQueue)).start();
new Thread(new _01_Consumer(blockingQueue)).start();
new Thread(new _01_Producer(blockingQueue)).start();
}
}
消费者[Thread-1]消费了:Thread-0-434 消费者[Thread-1]消费了:Thread-0-435 生产者线程[Thread-2]生产了427 生产者线程[Thread-3]生产了565 消费者[Thread-1]消费了:Thread-0-436 消费者[Thread-1]消费了:Thread-0-437 生产者线程[Thread-0]生产了469 生产者线程[Thread-5]生产了578 消费者[Thread-1]消费了:Thread-0-438 消费者[Thread-1]消费了:Thread-0-439 生产者线程[Thread-2]生产了428
使用同步字synchronized结合wait、notify(notifyAll)实现。
public class _02_ProducerConsumer<E> {
private Queue<E> queue; // 缓冲队列
private int capacity; // 容量
public _02_ProducerConsumer() {
this(16);
}
public _02_ProducerConsumer(int capacity) {
queue = new LinkedList<>();
this.capacity = capacity;
}
public synchronized void pro(E e) throws InterruptedException {
// 满了则等待
while (queue.size() == capacity) {
// 满了则等待;释放锁;等待其他线程notify、notifyAll才唤醒重新获得锁
this.wait();
}
if( queue.size() == 0 ){
// 唤醒其他线程,在当前线程释放锁之前其他线程只是就绪并不会立马执行
this.notifyAll();
}
queue.add(e);
}
/**
* 消费
* @return
* @throws InterruptedException
*/
public synchronized E con() throws InterruptedException {
// 空则等待
while (queue.size() == 0) {
this.wait();
}
if( queue.size() == capacity ){
this.notifyAll();
}
return queue.remove();
}
}
public static void main(String[] args) {
_02_ProducerConsumer<Integer> producerConsumer = new _02_ProducerConsumer();
ExecutorService es = Executors.newFixedThreadPool(2);
es.execute(()->{
while (true) {
try {
int nextInt = new Random().nextInt(1000);
System.out.println("生产者[" + Thread.currentThread().getName()
+ "]生产:" + nextInt);
producerConsumer.pro(nextInt);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(()->{
while (true) {
try {
Integer integer = producerConsumer.con();
System.out.println("消费者["+Thread.currentThread().getName()
+"]获得:" + integer);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
生产者[pool-1-thread-1]生产:418 生产者[pool-1-thread-1]生产:222 消费者[pool-1-thread-2]获得:418 消费者[pool-1-thread-2]获得:222 生产者[pool-1-thread-1]生产:278 生产者[pool-1-thread-1]生产:59 消费者[pool-1-thread-2]获得:278 消费者[pool-1-thread-2]获得:59 生产者[pool-1-thread-1]生产:217
使用ReentrantLock+Condition自定义一个阻塞队列。
public class _03_MyBlockingQueue<E> {
private Queue<E> queue;
private int capacity;
final ReentrantLock lock = new ReentrantLock();
final Condition notEmpty = lock.newCondition();
final Condition notFull = lock.newCondition();
public _03_MyBlockingQueue() {
this(16);
}
public _03_MyBlockingQueue(int capacity) {
this.queue = new LinkedList();
this.capacity = capacity;
}
// 生产
public void pro(E e) throws InterruptedException {
// 可中断锁
lock.lockInterruptibly();
try {
if (queue.size() == capacity) {
// 锁会自动释放,线程暂停直到该条件(notFull)发出signal、signalAll
notFull.await();
}
if (queue.size() == 0) {
notEmpty.signal();
}
queue.add(e);
} finally {
lock.unlock();
}
}
// 消费
public E con() throws InterruptedException {
lock.lockInterruptibly();
try {
if (queue.size() == 0) {
notEmpty.await();
}
if (queue.size() == capacity) {
notFull.signal();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
_03_MyBlockingQueue blockingQueue = new _03_MyBlockingQueue<>();
new Thread(()->{
while (true) {
int nextInt = new Random().nextInt(1000);
try {
blockingQueue.pro(nextInt);
System.out.println("生产:" + nextInt);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(()->{
while (true) {
try {
Object con = blockingQueue.con();
System.out.println("消费:" + con);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
生产:620 生产:728 消费:79 消费:307 生产:970 生产:759 消费:191 消费:624 消费:485 消费:982