我正在用Java做经典的生产者-消费者问题,使用低级同步和等待()和通知()。我知道使用java.util.concurrent包中的结构有更好的实现,但我的问题是围绕着低级实现:
private static ArrayList<Integer> list = new ArrayList<Integer>();
static Object obj = new Object();
public static void producer() throws InterruptedException {
synchronized (obj) {
while (true) {
if (list.size() == 10) {
System.out.println("Queue full.. Waiting to Add");
obj.wait();
} else {
int value = new Random().nextInt(100);
if (value <= 10) {
Thread.sleep(200);
System.out.println("The element added was : " + value);
list.add(value);
obj.notify();
}
}
}
}
}
public static void consumer() throws InterruptedException {
synchronized (obj) {
while (true) {
Thread.sleep(500);
if (list.size() == 0) {
System.out.println("Queue is empty...Waiting to remove");
obj.wait();
} else {
System.out.println("The element removed was : "
+ list.remove(0));
obj.notify();
}
}
}
}
程序中有两个线程,分别用于生产者和消费者。代码运行得很好。
唯一的问题是生产者继续生产消息,直到消息一次达到最大值(直到列表的大小为10 ),而消费者一次消费所有10个消息。
如何让生产者和消费者同时工作?
以下是示例输出:
The element added was : 4
The element added was : 0
The element added was : 0
The element added was : 4
The element added was : 3
The element added was : 1
The element added was : 10
The element added was : 10
The element added was : 3
The element added was : 9
Queue full.. Waiting to Add
The element removed was : 4
The element removed was : 0
The element removed was : 0
The element removed was : 4
The element removed was : 3
The element removed was : 1
The element removed was : 10
The element removed was : 10
The element removed was : 3
The element removed was : 9
Queue is empty...Waiting to remove
编辑:以下是更正后的代码:
private static ArrayList<Integer> list = new ArrayList<Integer>();
private static Object obj = new Object();
public static void producer() throws InterruptedException {
while (true) {
Thread.sleep(500);
if (list.size() == 10) {
System.out.println("Waiting to add");
synchronized (obj) {
obj.wait();
}
}
synchronized (obj) {
int value = new Random().nextInt(10);
list.add(value);
System.out.println("Added to list: " + value);
obj.notify();
}
}
}
public static void consumer() throws InterruptedException {
while (true) {
Thread.sleep(500);
if (list.size() == 0) {
System.out.println("Waiting to remove");
synchronized (obj) {
obj.wait();
}
}
synchronized (obj) {
int removed = list.remove(0);
System.out.println("Removed from list: " + removed);
obj.notify();
}
}
}
发布于 2014-12-22 01:04:05
不能在具有相同对象的同步块中运行两个线程。当一个方法正在运行时,另一个方法只有在另一个线程调用wait
方法时才能运行。
要解决这个问题,只需将add
和remove
放在同步块中即可。有关详细信息,请参阅this。
发布于 2017-02-25 20:48:54
生产者和消费者问题是多进程同步问题的经典例子。这描述了两个进程,生产者和消费者,它们共享公共资源buffer。生产者作业是生成数据并将其放入缓冲区,而消费者作业是使用生成的数据并从缓冲区中移除。
生产者必须确保在缓冲区已满时不添加任何元素,它应该调用wait()
,直到消费者消费了一些数据和notify
到生产者线程,消费者必须确保当项目已经为空时,它不应该尝试从缓冲区中移除项目,它应该调用wait()
,它只是等待生产者生成数据并将其添加到缓冲区中,并使用notify
或notifyAll
通知消费者。
这个问题可以使用BlockingQueue
接口来解决,它管理这个生产者和消费者自己的实现。
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/*
* To change this license header, choose License Headers in Project `Properties`.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
/**
*
* @author sakshi
*/
public class ThreadProducer {
static List<Integer> list = new ArrayList<Integer>();
static class Producer implements Runnable {
List<Integer> list;
public Producer(List<Integer> list) {
this.list = list;
}
@Override
public void run() {
synchronized (list) {
for (int i = 0; i < 10; i++) {
if (list.size() >= 1) {
try {
System.out.println("producer is waiting ");
list.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("produce=" + i);
list.add(i);
list.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
//To change body of generated methods, choose Tools | Templates.
}
}
static class Consumer implements Runnable {
List<Integer> list;
public Consumer(List<Integer> list) {
this.list = list;
}
@Override
public void run() {
synchronized (list) {
for (int i = 0; i < 10; i++) {
while (list.isEmpty()) {
System.out.println("Consumer is waiting");
try {
list.wait();
} catch (InterruptedException ex) {
ex.printStackTrace();;
}
}
int k = list.remove(0);
System.out.println("consume=" + k);
list.notifyAll();
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
public static void main(String[] args) {
Thread producer = new Thread(new Producer(list));
Thread consumer = new Thread(new Consumer(list));
producer.start();
consumer.start();
}
}
输出:
produce=0
producer is waiting
consume=0
Consumer is waiting
produce=1
producer is waiting
consume=1
Consumer is waiting
produce=2
producer is waiting
consume=2
Consumer is waiting
produce=3
producer is waiting
consume=3
Consumer is waiting
produce=4
producer is waiting
consume=4
Consumer is waiting
produce=5
producer is waiting
consume=5
Consumer is waiting
produce=6
producer is waiting
consume=6
Consumer is waiting
produce=7
producer is waiting
consume=7
Consumer is waiting
produce=8
producer is waiting
consume=8
Consumer is waiting
produce=9
consume=9
发布于 2018-09-23 04:38:25
类资源{
private final int capacity = 2;
public static int value = 0;
LinkedList < Integer > list;
Resources() {
list = new LinkedList < > ();
}
void consume() throws InterruptedException {
while (true) {
synchronized(this) {
while (list.size() == 0) {
wait();
}
int val = list.removeFirst();
System.out.println("Value consumed:" + val);
notify();
//Thread.sleep(1000);
}
}
}
void produce() throws InterruptedException {
while (true) {
synchronized(this) {
while (list.size() == capacity) {
wait();
}
System.out.println("Value produced:" + value);
list.add(value++);
notify();
Thread.sleep(1000);
}
}
}
}
类MyThread5扩展了线程{
Resources rs;
String name;
public String getNames() {
return name;
}
public MyThread5(Resources rs, String name) {
this.rs = rs;
this.name = name;
}
@Override
public void run() {
if (this.getNames().equals("Producer")) {
try {
this.rs.produce();
} catch (InterruptedException ex) {
Logger.getLogger(MyThread5.class.getName()).log(Level.SEVERE, null, ex);
}
} else {
try {
this.rs.consume();
} catch (InterruptedException ex) {
Logger.getLogger(MyThread5.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
公共类ProducerConsumerExample {
public static void main(String[] args) {
try {
Resources rs = new Resources();
MyThread5 m1 = new MyThread5(rs, "Producer");
MyThread5 m2 = new MyThread5(rs, "Consumer");
m1.start();
m2.start();
m1.join();
m2.join();
} catch (InterruptedException ex) {
Logger.getLogger(ProducerConsumerExample.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
https://stackoverflow.com/questions/27591043
复制相似问题