首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >阻塞队列和多线程消费者,如何知道何时停止

阻塞队列和多线程消费者,如何知道何时停止
EN

Stack Overflow用户
提问于 2012-01-24 00:05:58
回答 5查看 60K关注 0票数 69

我有一个单线程生成器,它创建一些任务对象,然后将这些对象添加到ArrayBlockingQueue (具有固定大小)中。

我还启动了一个多线程使用者。这是作为固定线程池(Executors.newFixedThreadPool(threadCount);)构建的。然后,我向这个threadPool提交了一些ConsumerWorker意图,每个ConsumerWorker都有一个对上面提到的ArrayBlockingQueue实例的引用。

每个这样的Worker将在队列上执行一个take()并处理任务。

我的问题是,当没有更多的工作要做时,让员工知道最好的方法是什么。换句话说,我如何告诉worker生产者已经完成了向队列的添加,并且从这一点开始,每个worker在看到队列为空时应该停止。

我现在得到的是一个设置,其中我的Producer使用一个回调进行初始化,该回调在他完成它的任务(向队列中添加内容)时触发。我还保存了我创建并提交给ThreadPool的所有ConsumerWorkers的列表。当生产者回调告诉我生产者已经完成时,我可以将这一点告诉每个工人。此时,它们应该继续检查队列是否为空,当队列为空时,它们应该停止,从而允许我优雅地shutDown ExecutorService线程池。大概是这样的

代码语言:javascript
复制
public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是,我有一个明显的竞争条件,有时生产者会结束,发信号通知它,ConsumerWorkers会在消耗队列中的所有东西之前停止。

我的问题是什么是最好的方式来同步这一切,使其正常工作?我是否应该同步整个部分,检查生产者是否正在运行,队列是否为空,并在一个块中(在队列对象上)从队列中获取一些东西?我应该只同步ConsumerWorker实例上isRunning布尔值的更新吗?还有其他建议吗?

更新一下,下面是我最终使用的工作实现:

代码语言:javascript
复制
public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这在很大程度上受到了JohnVint下面的答案的启发,只做了一些小的修改。

根据@vendhan的评论进行===更新。

感谢您的关注。你是对的,这个问题中的第一个代码片段有(在其他问题中) while(isRunning || !inputQueue.isEmpty())没有实际意义的问题。

在我实际的最终实现中,我做了一些更接近您的建议的事情,将"||“(或)替换为"&&”(和),因为每个worker (使用者)现在只检查他从列表中获得的元素是否是毒丸,如果是,就停止(所以理论上我们可以说worker必须在运行,队列不能为空)。

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2012-01-24 00:10:46

您应该继续从队列中执行take()。你可以用毒丸来告诉工人停止工作。例如:

代码语言:javascript
复制
private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}
票数 87
EN

Stack Overflow用户

发布于 2012-01-24 00:15:22

我会给工人们发送一个特殊的工作包,表示他们应该关闭:

代码语言:javascript
复制
public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

要停止工作进程,只需将ConsumerWorker.DONE添加到队列中。

票数 14
EN

Stack Overflow用户

发布于 2017-11-15 14:40:14

我们不能使用CountDownLatch来实现吗?这里的大小是生产者中的记录数。每个消费者都会在处理完一条记录后进行countDown。当所有任务都完成时,它会跨越awaits()方法。然后停止你所有的消费者。因为所有记录都已处理。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/8974638

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档