使用python,我试图演示生产者/使用者多线程场景如何在使用者线程最终等待一个空队列时导致僵局,该队列将在执行的剩余时间内保持为空,并‘直到它结束时’,以及如何解决这一问题,避免饥饿或程序突然“脏中断”。
因此,我从在这篇漂亮的RealPython文章中,生产者/消费者使用队列进行线程处理中提取了代码,这里是原始代码摘录:
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
我注意到,无论不太可能发生,代码本身都会导致我在“主”线程设置“事件”时描述的情况,而“生产者”在“使用者”仍在等待从队列中得到消息时结束。
要解决单个“使用者”情况下的问题,只需简单检查队列在“get”指令调用之前是否为空就足够了(例如:
if (not q.empty()): message = q.get()
)。但是,这个问题仍然存在于多用户场景中,因为线程可以在队列空值检查之后立即交换,而另一个使用者(第二个)可能会得到使队列空的消息,这样就可以交换回以前的使用者(第一个),它将调用一个空队列上的get,就这样了。
我想寻找一种解决方案,即使在假设的多消费者场景中也可能有效。因此,我以这种方式修改了“使用者”代码,实质上在队列get指令中添加了一个超时并管理异常:
def consumer(q, event, n):
while not event.is_set() or not q.empty():
print("Consumer"+n+": Q-get")
try:
time.sleep(0.1) #(I don't really need this, I just want to force a consumer-thread swap at this precise point :=> showing that, just as expected, things will work well in a multi-consumer scenario also)
message = q.get(True,1)
except queue.Empty:
print("Consumer"+n+": looping on empty queue")
time.sleep(0.1) #(I don't really need this at all... just hoping -unfortunately without success- _main_ to swap on ThreadPoolExecutor)
continue
logging.info("Consumer%s storing message: %s (size=%d)", n,message,q.qsize())
print("Consumer"+n+": ended")
还修改了“主体”部分,使其将消息放入队列,并使其生成第二个消费者,而不是生产者.
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
pipeline.put("XxXxX")
print("Let's start (ThreadPoolExecutor)")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
print("(_main_ won't even get this far... Why?)") #!#
time.sleep(2)
logging.info("Main: about to set event")
event.set()
(请注意,我在这里的目的是阻止消费者死锁的风险,并表明它实际上是无效的,在这个阶段,我不需要生产者,这就是为什么我使代码不产生它)
现在的问题是,我不明白为什么,如果线程是用threading.Thread(...).start()
生成的,那么一切看起来都很好,例如:
print("Let's start (simple Thread)")
for i in range(1,3): threading.Thread(target=consumer, args=(pipeline, event, str(i))).start()
而使用似乎使“主”线程永远不恢复(看起来它甚至没有进入休眠调用),因此执行永远不会结束,从而导致无限的使用者循环.你能帮我理解为什么会有这种“差异”吗?知道这对我很重要,我认为几乎肯定会帮助我理解它是否可以被解决,或者如果我一定要被迫不使用ThreadPoolExecutor,所以.谢谢您在这方面的宝贵帮助!
发布于 2022-05-01 20:19:06
问题是,您将event.set()
放在管理ThreadPoolExecutor
的with
块之外。当与with
一起使用时,在退出.shutdown(wait=True)
时。因此,您正在等待工人完成,他们不会完成,因为您还没有设置event
。
如果您想让它在可能的时候关闭,但不想立即等待,您可以这样做:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
try:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
executor.shutdown(wait=False) # No new work can be submitted after this point, and
# workers will opportunistically exit if no work left
time.sleep(2)
logging.info("Main: about to set event")
finally:
event.set() # Set the event *before* we block awaiting shutdown
# Done in a finally block to ensure its set even on exception
# so the with doesn't try to block in a case where
# it will never finish
# Blocks for full shutdown here; after this, the pool is definitely finished and cleaned up
https://stackoverflow.com/questions/72079931
复制相似问题