首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >需要线程安全的异步消息队列

需要线程安全的异步消息队列
EN

Stack Overflow用户
提问于 2013-05-31 20:59:54
回答 2查看 9.2K关注 0票数 9

我正在寻找一个Python类(最好是标准语言的一部分,而不是第三方库)来管理异步的“广播式”消息。

我将有一个线程将消息放到队列中( 'putMessageOnQueue‘方法不能阻塞),然后是多个其他线程,它们都将等待消息,可能已经调用了一些阻塞的'waitForMessage’函数。当一条消息被放入队列时,我希望每个等待的线程都能获得它自己的消息副本。

我看过内置的Queue类,但我不认为这是合适的,因为消费消息似乎涉及从队列中删除它们,所以每个消息只有一个客户端线程可以看到。

这看起来应该是一个常见的用例,有人能推荐一个解决方案吗?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2013-05-31 22:43:09

我认为最典型的方法是为每个线程使用单独的消息队列,并将消息推送到先前注册了有兴趣接收此类消息的每个队列中。

像这样的东西应该可以工作,但它是未经测试的代码...

代码语言:javascript
运行
复制
from time import sleep
from threading import Thread
from Queue import Queue

class DispatcherThread(Thread):

   def __init__(self, *args, **kwargs):
       super(DispatcherThread, self).__init__(*args, **kwargs)
       self.interested_threads = []

   def run(self):
       while 1:
           if some_condition:
               self.dispatch_message(some_message)
           else:
               sleep(0.1)

   def register_interest(self, thread):
       self.interested_threads.append(thread)

   def dispatch_message(self, message):
       for thread in self.interested_threads:
           thread.put_message(message)



class WorkerThread(Thread):

   def __init__(self, *args, **kwargs):
       super(WorkerThread, self).__init__(*args, **kwargs)
       self.queue = Queue()


   def run(self):

       # Tell the dispatcher thread we want messages
       dispatcher_thread.register_interest(self)

       while 1:
           # Wait for next message
           message = self.queue.get()

           # Process message
           # ...

   def put_message(self, message):
       self.queue.put(message)


dispatcher_thread = DispatcherThread()
dispatcher_thread.start()

worker_threads = []
for i in range(10):
    worker_thread = WorkerThread()
    worker_thread.start()
    worker_threads.append(worker_thread)

dispatcher_thread.join()
票数 8
EN

Stack Overflow用户

发布于 2015-09-08 19:41:19

我认为这是一个更直接的示例(取自Python Lib中的队列示例)

代码语言:javascript
运行
复制
from threading import Thread
from Queue import Queue


num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/16857883

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档