首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >当阻塞队列为空时杀死消费者

当阻塞队列为空时杀死消费者
EN

Stack Overflow用户
提问于 2020-12-01 09:07:29
回答 2查看 644关注 0票数 0

我正在阅读关于阻塞队列、executoreserivce和生产者-消费者范例的文章。我希望有一个不断变化的生产者数量和消费者的数量。每个生产者将附加到队列中,使用者将使用这些消息并处理它们。我的问题是-生产者如何知道消费者已经完成了,并且没有更多的消息将进入队列?我想在我的主线上加一个计数器。当生产者启动时,我将增加计数器,当每个生产者结束时,他们将减少int。我的消费者将能够知道计数器,当它达到0,并且队列中没有更多的元素时,他们就会死掉。

在同步工作方面的另一个一般性问题--主线程应该读取队列的内容并为每条消息添加执行程序吗?还是让线程知道这个逻辑并自行决定何时死亡是最好的做法吗?

当系统启动时,我会收到一个数字,该数字决定了将启动多少个生产商。每个生产者都会在队列中生成一组随机的数字。消费者将把这些数字打印到日志中。我现在面临的问题是,一旦我知道最后一位制作人输入了最后一个号码,我仍然不明白如何让消费者知道不会有更多的数字进入,他们应该关闭。

消费者是如何知道生产者什么时候做的?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-12-01 10:16:23

解决这个问题的一个很好的解决方案是使用PoisonPill模式。下面是它如何工作的一个例子。在这种情况下,你只需要知道生产者的数量。

编辑:我更新了代码,以便在最后一个使用者完成工作时清除队列。

代码语言:javascript
运行
复制
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class PoisonPillsTests {

    interface Message {

    }

    interface PoisonPill extends Message {
        PoisonPill INSTANCE = new PoisonPill() {
        };
    }

    static class TextMessage implements Message {

        private final String text;

        public TextMessage(String text) {
            this.text = text;
        }

        public String getText() {
            return text;
        }

        @Override
        public String toString() {
            return text;
        }
    }

    static class Producer implements Runnable {

        private final String producerName;
        private final AtomicInteger producersCount;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Producer(String producerName, BlockingQueue<Message> messageBlockingQueue, AtomicInteger producersCount) {
            this.producerName = producerName;
            this.messageBlockingQueue = messageBlockingQueue;
            this.producersCount = producersCount;
        }

        @Override
        public void run() {
            try {
                for (int i = 0; i < 100; i++) {
                    messageBlockingQueue.put(new TextMessage("Producer " + producerName + " message " + i));
                }
                if (producersCount.decrementAndGet() <= 0) {
                    //we need this producersCount so that the producers to produce a single poison pill
                    messageBlockingQueue.put(PoisonPill.INSTANCE);
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Producer interrupted", e);
            }
        }
    }

    static class Consumer implements Runnable {

        private final AtomicInteger consumersCount;
        private final AtomicInteger consumedMessages;
        private final BlockingQueue<Message> messageBlockingQueue;

        public Consumer(BlockingQueue<Message> messageBlockingQueue, AtomicInteger consumersCount, AtomicInteger consumedMessages) {
            this.messageBlockingQueue = messageBlockingQueue;
            this.consumersCount = consumersCount;
            this.consumedMessages = consumedMessages;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Message message = null;
                    message = messageBlockingQueue.take();

                    if (message instanceof PoisonPill) {
                        //we put back the poison pill so that to be consumed by the next consumer
                        messageBlockingQueue.put(message);
                        break;
                    } else {
                        consumedMessages.incrementAndGet();
                        System.out.println("Consumer got message " + message);
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException("Consumer interrupted", e);
            } finally {
                if (consumersCount.decrementAndGet() <= 0) {
                    System.out.println("Last consumer, clearing the queue");
                    messageBlockingQueue.clear();
                }
            }
        }
    }

    public static void main(String[] args) {

        final AtomicInteger producerCount = new AtomicInteger(4);
        final AtomicInteger consumersCount = new AtomicInteger(2);
        final AtomicInteger consumedMessages = new AtomicInteger();
        BlockingQueue<Message> messageBlockingQueue = new LinkedBlockingQueue<>();


        List<CompletableFuture<Void>> tasks = new ArrayList<>();
        for (int i = 0; i < producerCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Producer("" + (i + 1), messageBlockingQueue, producerCount)));
        }

        for (int i = 0; i < consumersCount.get(); i++) {
            tasks.add(CompletableFuture.runAsync(new Consumer(messageBlockingQueue, consumersCount, consumedMessages)));
        }

        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();

        System.out.println("Consumed " + consumedMessages + " messages");

    }
}
票数 1
EN

Stack Overflow用户

发布于 2020-12-01 09:35:01

当生产者完成,最后一个可以中断所有的消费者和(可能的)生产者。

每当阻塞调用(无论是put()还是take())通过thread.interrupt()被另一个线程交织时,都会抛出thread.interrupt(),其中thread是调用该方法的线程。当最后一个生成器完成时,它可以中断所有其他线程,这将导致抛出InterruptedException的所有阻塞方法,从而允许您终止相应的线程。

代码语言:javascript
运行
复制
final BlockingQueue<T> queue = ...;
final List<Thread> threads = new ArrayList<>();

threads.add(new Producer1());
threads.add(new Producer2());
threads.add(new Consumer1());
threads.add(new Consumer2());
threads.forEach(Thread::start);

// Done by the last producer, or any other thread
threads.forEach(Thread::interrupt);

class Producer extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < X; i++) {
            T element;
            // Produce element
            try {
                queue.put(element);
            } catch (InterruptedException e) {
                break; // Optional, only if other producers may still be running and
                       // you want to stop them, or interruption is performed by
                       // a completely different thread
            }
        }
    }
}

class Consumer extends Thread {
    @Override
    public void run() {
        while (true) {
            T element;
            try {
                element = queue.take();
            } catch (InterruptedException e) {
                break;
            }
            // Consume element
        }
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65087532

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档