假设有1个生产者P和2个消费者C1和C2。并且有2个队列Q1和Q2,它们都具有特定的容量。
P将生成项目,并将其交替放入Q1和Q2中。商品是为特定消费者生产的,不能被其他消费者消费。如何在java语言中实现以下功能:在启动3个线程后,如果Q1为空,线程C1将被阻塞,直到Q1中出现异常时才会通知它。Q2也是如此。当Q1和Q2都已满时,P将被阻塞,直到当Q1或Q2未满时被通知。
我在考虑使用BlockingQueue,当消费者的队列为空时,它会阻塞消费者。但问题是,当其中一个队列已满时,生产者将被阻塞。Java中有没有什么数据结构可以用来解决这个问题?
更新
我自己已经有了一个解决方案,但我不确定它是否有效。我们仍然可以有2个BlockingQueues。当消费者从它的队列中取出物品时,它使用BlockingQueue.take(),所以当队列中没有物品时,它将被阻塞。当生产者将项目添加到任一队列时,它将使用BlockingQueue.offer()。这样它就永远不会被这个操作阻塞,如果队列满了,它就会得到'false‘。此外,我们保留一个AtomicInteger来指示未满的队列的数量。每当生产者P想要将一项放入队列时,如果它得到假返回,我们将AtomicInteger减1。当它达到0时,生产者调用AtomicInteger.wait()。每当消费者从其队列中取出一项时,它还会检查AtomicInteger。当它为0时,消费者将其加1并调用AtomicInteger.notify()。
请让我知道这个解决方案是否有意义。
非常感谢!
发布于 2013-05-31 22:29:13
你有没有考虑过Striped Executor Service。这将允许你解决你的问题,和将你的消费者放在一个池中,这将会更有效率。
发布于 2013-05-31 22:23:24
您可以使用框架中的主题。
在activemq http://activemq.apache.org/how-does-a-queue-compare-to-a-topic.html中
在hornetq exact example for JMS Topic in HornetQ中
发布于 2013-05-31 22:42:21
无论您选择哪种数据结构/messagig服务器,您都可以使用其中的任何一种来耗尽资源。内存或磁盘空间总是有限制的。
所以实际上生产者被停止并不是件坏事。
如果您的队列正在填满,您应该尝试恢复balance:您可以添加更多消费者。您可以提高消费者的性能。如果这是不可能的,一些东西应该真的扼杀制作人。这是一种避免内存不足错误或设备上没有剩余空间的方法。
最后,数据中心有责任监控队列。如果队列的填充度达到限制,例如> 80%,他们应该通知您。
更新
如果生产者无法发送所有队列,因为其中一个队列已满,则由生产者进行缓冲,但缓冲是队列应该做的事情。
https://stackoverflow.com/questions/16859389
复制相似问题