我已经创建了一个CloseableBlockingQueue扩展ArrayBlockingQueue:
private static class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;
public CloseableBlockingQueue(int queueLength) {
super(queueLength);
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public boolean offer(E e) {
return closed ? true : super.offer(e);
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public void put(E e) throws InterruptedException {
if (!closed) {
super.put(e);
}
}
/***
* Shortcut to do nothing if closed.
*/
@Override
public E poll() {
return closed ? null : super.poll();
}
/***
* Shortcut to do nothing if closed.
* @throws InterruptedException
*/
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
return closed ? null : super.poll(l, tu);
}
/***
* Mark me as closed and clear the queue.
*/
void close() {
closed = true;
// There could be more threads blocking on my queue than there are slots
// in the queue. We therefore MUST loop.
do {
// so keep clearing
clear();
/* Let others in ... more specifically, any collectors blocked on the
* queue should now unblock and finish their put.
*
* Subsequent puts should shortcut but it is the waiting ones I need
* to clear.
*/
Thread.yield();
/* Debug code.
// Did yield achieve?
if (!super.isEmpty()) {
* This DOES report success.
log("! Thread.yield() successful");
}
*
*/
// Until all are done.
} while (!super.isEmpty());
}
/***
* isClosed
*
* @return true if closed.
*/
boolean isClosed() {
return closed;
}
}
我关心的是close方法,它试图使队列上阻塞的任何线程恢复正常运行。我使用Thread.yield()来尝试这一点,但是我看到了一些引用,表明这种技术可能并不总是有效的,因为不能保证在产生结果时会唤醒任何其他被阻塞的线程。
队列用于将多个线程的输出集中到单个流中。与队列中的插槽相比,很容易会有更多的线程给它喂食,所以队列很有可能已经满了,当它关闭时,有几个线程会阻塞它。
我欢迎你的想法。
添加了
由于汤姆以下的建议,我重新考虑到:
顺便说一句:由于线程集合主要用于添加对象,并且几乎立即删除了相同的对象,所以我从http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm中获取了Doug令人印象深刻的http://www.java2s.com/Code/Java/Collections-Data-Structure/ConcurrentDoublyLinkedList.htm的副本,并添加了几个方法,以允许我保留添加的节点。然后移除应该是O(1)而不是O(n)。
保罗
发布于 2011-10-26 02:57:43
我不认为don()会影响队列上阻塞的线程。
如果您能够跟踪等待的线程(如果您正在包装阻塞方法,那么应该很简单)。您可以在关闭时对它们调用中断()。
参见这个问题:ArrayBlockingQueue - How to "interrupt" a thread that is wating on .take() method
发布于 2011-10-26 02:09:53
使用wait/notifyAll()
,或者最好使用来自util.concurrent
包的同步原语之一,比如CountDownLatch
,而不是产生结果时检查循环。将一个对象放在队列的末尾,该对象在处理时触发通知。使调用线程(关闭方法的线程)等待此通知。它会一直睡到排完队为止。
发布于 2011-10-26 02:26:51
我会在队列里放一颗毒丸。例如空。当等待线程得到药丸时,它会把它放回队列中。
E pill = null;
void close() {
closed = true;
clear();
while(!offer(pill)); // will wake one thread.
}
public E poll() {
if (closed) return null;
E e = super.poll();
if (e == pill) add(e); // will wake one thread.
return e;
}
https://stackoverflow.com/questions/7901114
复制相似问题