我正在寻找一个Python类(最好是标准语言的一部分,而不是第三方库)来管理异步的“广播式”消息。
我将有一个线程将消息放到队列中( 'putMessageOnQueue‘方法不能阻塞),然后是多个其他线程,它们都将等待消息,可能已经调用了一些阻塞的'waitForMessage’函数。当一条消息被放入队列时,我希望每个等待的线程都能获得它自己的消息副本。
我看过内置的Queue类,但我不认为这是合适的,因为消费消息似乎涉及从队列中删除它们,所以每个消息只有一个客户端线程可以看到。
这看起来应该是一个常见的用例,有人能推荐一个解决方案吗?
发布于 2013-05-31 22:43:09
我认为最典型的方法是为每个线程使用单独的消息队列,并将消息推送到先前注册了有兴趣接收此类消息的每个队列中。
像这样的东西应该可以工作,但它是未经测试的代码...
from time import sleep
from threading import Thread
from Queue import Queue
class DispatcherThread(Thread):
def __init__(self, *args, **kwargs):
super(DispatcherThread, self).__init__(*args, **kwargs)
self.interested_threads = []
def run(self):
while 1:
if some_condition:
self.dispatch_message(some_message)
else:
sleep(0.1)
def register_interest(self, thread):
self.interested_threads.append(thread)
def dispatch_message(self, message):
for thread in self.interested_threads:
thread.put_message(message)
class WorkerThread(Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.queue = Queue()
def run(self):
# Tell the dispatcher thread we want messages
dispatcher_thread.register_interest(self)
while 1:
# Wait for next message
message = self.queue.get()
# Process message
# ...
def put_message(self, message):
self.queue.put(message)
dispatcher_thread = DispatcherThread()
dispatcher_thread.start()
worker_threads = []
for i in range(10):
worker_thread = WorkerThread()
worker_thread.start()
worker_threads.append(worker_thread)
dispatcher_thread.join()发布于 2015-09-08 19:41:19
我认为这是一个更直接的示例(取自Python Lib中的队列示例)
from threading import Thread
from Queue import Queue
num_worker_threads = 2
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are donehttps://stackoverflow.com/questions/16857883
复制相似问题