生产者消费者问题的任何有效解决方案都必须控制对产生资源的生产的put()
方法的调用以及对消耗资源的消费者的 take()
方法的调用。一旦实现了对方法阻塞的控制,就可以解决问题。
Java提供了开箱即用的支持来控制此类方法的调用,其中一个线程正在创建资源而另一个线程正在消耗资源 BlockingQueue。程序包中的 Java BlockingQueue 接口 java.util.concurrent 表示一个队列,该线程可以安全地放入和从中获取实例。
BlockingQueue is a construct where one thread putting resources into it, and another thread taking from it.
这正是解决生产者消费者问题所需要的。让我们现在解决问题!
下面的代码用于生产者线程。
class Producer implements Runnable
{
protected BlockingQueue<Object> queue;
Producer(BlockingQueue<Object> theQueue) {
this.queue = theQueue;
}
public void run()
{
try
{
while (true)
{
Object justProduced = getResource();
queue.put(justProduced);
System.out.println("Produced resource - Queue size now = " + queue.size());
}
}
catch (InterruptedException ex)
{
System.out.println("Producer INTERRUPTED");
}
}
Object getResource()
{
try
{
Thread.sleep(100); // simulate time passing during read
}
catch (InterruptedException ex)
{
System.out.println("Producer Read INTERRUPTED");
}
return new Object();
}
}
在这里,生产者线程创建资源(即对象)并将其放入队列。如果队列已满(最大大小为20);然后它将等待–直到使用者线程从中提取资源。因此,队列大小永远不会超过最大值,即20。
下面的代码适用于消费者线程。
class Consumer implements Runnable
{
protected BlockingQueue<Object> queue;
Consumer(BlockingQueue<Object> theQueue) {
this.queue = theQueue;
}
public void run() {
try
{
while (true)
{
Object obj = queue.take();
System.out.println("Consumed resource - Queue size now = " + queue.size());
take(obj);
}
}
catch (InterruptedException ex)
{
System.out.println("CONSUMER INTERRUPTED");
}
}
void take(Object obj)
{
try
{
Thread.sleep(100); // simulate time passing
}
catch (InterruptedException ex)
{
System.out.println("Consumer Read INTERRUPTED");
}
System.out.println("Consuming object " + obj);
}
}
使用者线程将资源从队列中拉出(如果存在),否则它将等待,然后在生产者将某些东西放入其中时再次检查。
现在让我们测试上面编写的生产者和消费者组件。
public class ProducerConsumerExample
{
public static void main(String[] args) throws InterruptedException
{
int numProducers = 4;
int numConsumers = 3;
BlockingQueue<Object> myQueue = new LinkedBlockingQueue<>(20);
for (int i = 0; i < numProducers; i++){
new Thread(new Producer(myQueue)).start();
}
for (int i = 0; i < numConsumers; i++){
new Thread(new Consumer(myQueue)).start();
}
// Let the simulation run for, say, 10 seconds
Thread.sleep(10 * 1000);
// End of simulation - shut down gracefully
System.exit(0);
}
}
运行代码时,您会发现类似于以下内容的输出:
消耗的资源-队列大小现在= 1
产生的资源-队列大小现在= 1
消耗的资源-队列大小现在= 1
消耗的资源-队列大小现在= 1
产生的资源-队列大小现在= 1
产生的资源-队列大小现在= 1
产生的资源-队列大小现在= 1
消费对象java.lang.Object@14c7f728
消耗的资源-队列大小现在为0
消耗对象java.lang.Object@2b71e323
消耗的资源-队列大小现在为0
产生的资源-队列大小现在= 0
产生的资源-队列大小现在= 1
产生的资源-队列大小现在= 2
消耗对象java.lang.Object@206dc00b
消耗的资源-队列大小现在= 1
产生的资源-队列大小现在= 2
产生的资源-队列大小现在= 3
消耗对象java.lang.Object@1a000bc0
消耗的资源-队列大小现在= 2
消费对象java.lang.Object@25b6183d
消耗的资源-队列大小现在= 1
产生的资源-队列大小现在= 2
产生的资源-队列大小现在= 3
...
...
产生的资源-队列大小现在= 20
消耗对象java.lang.Object@2b3cd3a6
消耗的资源-队列大小现在= 19
产生的资源-队列大小现在= 20
消耗对象java.lang.Object@3876982d
消耗的资源-队列大小现在= 19
产生的资源-队列大小现在= 20
输出清楚地表明,队列大小永远不会超过20,并且消费者线程正在处理生产者线程放置的队列资源。就是这么简单。
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html https://howtodoinjava.com/java/multi-threading/producer-consumer-problem-using-blockingqueue/