我正在阅读关于阻塞队列、executoreserivce和生产者-消费者范例的文章。我希望有一个不断变化的生产者数量和消费者的数量。每个生产者将附加到队列中,使用者将使用这些消息并处理它们。我的问题是-生产者如何知道消费者已经完成了,并且没有更多的消息将进入队列?我想在我的主线上加一个计数器。当生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少int。我的消费者将能够知道计数器,当它达到0,并且队列中没有更多的元素时,他们就会死掉。
在同步工作方面的另一个一般性问题--主线程应该读取队列的内容并为每条消息添加执行程序吗?还是让线程知道这个逻辑并自行决定何时死亡是最好的做法吗?
当系统启动时,我会收到一个数字,该数字决定了将启动多少个生产商。每个生产者都会在队列中生成一组随机的数字。消费者将把这些数字打印到日志中。我现在面临的问题是,一旦我知道最后一位制作人输入了最后一个号码,我仍然不明白如何让消费者知道不会有更多的数字进入,他们应该关闭。
消费者是如何知道生产者什么时候做的?
发布于 2020-12-01 10:16:23
解决这个问题的一个很好的解决方案是使用PoisonPill模式。下面是它如何工作的一个例子。在这种情况下,你只需要知道生产者的数量。
编辑:我更新了代码,以便在最后一个使用者完成工作时清除队列。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class PoisonPillsTests {
interface Message {
}
interface PoisonPill extends Message {
PoisonPill INSTANCE = new PoisonPill() {
};
}
static class TextMessage implements Message {
private final String text;
public TextMessage(String text) {
this.text = text;
}
public String getText() {
return text;
}
@Override
public String toString() {
return text;
}
}
static class Producer implements Runnable {
private final String producerName;
private final AtomicInteger producersCount;
private final BlockingQueue<Message> messageBlockingQueue;
public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
this.producerName = producerName;
this.messageBlockingQueue = messageBlockingQueue;
this.producersCount = producersCount;
}
@Override
public void run() {
try {
for (int i = 0; i < 100; i++) {
messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
}
if (producersCount.decrementAndGet() <= 0) {
//we need this producersCount so that the producers to produce a single poison pill
messageBlockingQueue.put(PoisonPill.INSTANCE);
}
} catch (InterruptedException e) {
throw new RuntimeException("Producer interrupted", e);
}
}
}
static class Consumer implements Runnable {
private final AtomicInteger consumersCount;
private final AtomicInteger consumedMessages;
private final BlockingQueue<Message> messageBlockingQueue;
public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
this.messageBlockingQueue = messageBlockingQueue;
this.consumersCount = consumersCount;
this.consumedMessages = consumedMessages;
}
@Override
public void run() {
try {
while (true) {
Message message = null;
message = messageBlockingQueue.take();
if (message instanceof PoisonPill) {
//we put back the poison pill so that to be consumed by the next consumer
messageBlockingQueue.put(message);
break;
} else {
consumedMessages.incrementAndGet();
System.out.println("Consumer got message " + message);
}
}
} catch (InterruptedException e) {
throw new RuntimeException("Consumer interrupted", e);
} finally {
if (consumersCount.decrementAndGet() <= 0) {
System.out.println("Last consumer, clearing the queue");
messageBlockingQueue.clear();
}
}
}
}
public static void main(String[] args) {
final AtomicInteger producerCount = new AtomicInteger(4);
final AtomicInteger consumersCount = new AtomicInteger(2);
final AtomicInteger consumedMessages = new AtomicInteger();
BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();
List<CompletableFuture<Void>> tasks = new ArrayList<>();
for (int i = 0; i < producerCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
}
for (int i = 0; i < consumersCount.get(); i++) {
tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
}
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
System.out.println("Consumed " + consumedMessages + " messages");
}
}
发布于 2020-12-01 09:35:01
当生产者完成,最后一个可以中断所有的消费者和(可能的)生产者。
每当阻塞调用(无论是put()
还是take()
)通过thread.interrupt()
被另一个线程交织时,都会抛出thread.interrupt()
,其中thread
是调用该方法的线程。当最后一个生成器完成时,它可以中断所有其他线程,这将导致抛出InterruptedException
的所有阻塞方法,从而允许您终止相应的线程。
final BlockingQueue<T> queue = ...;
final List<Thread> threads = new ArrayList<>();
threads.add(new Producer1());
threads.add(new Producer2());
threads.add(new Consumer1());
threads.add(new Consumer2());
threads.forEach(Thread::start);
// Done by the last producer, or any other thread
threads.forEach(Thread::interrupt);
class Producer extends Thread {
@Override
public void run() {
for (int i = 0; i < X; i++) {
T element;
// Produce element
try {
queue.put(element);
} catch (InterruptedException e) {
break; // Optional, only if other producers may still be running and
// you want to stop them, or interruption is performed by
// a completely different thread
}
}
}
}
class Consumer extends Thread {
@Override
public void run() {
while (true) {
T element;
try {
element = queue.take();
} catch (InterruptedException e) {
break;
}
// Consume element
}
}
}
https://stackoverflow.com/questions/65087532
复制相似问题