我很难理解多进程队列是如何在python上工作的,以及如何实现它。假设我有两个从共享文件访问数据的python模块,我们称这两个模块为写入器和读取器。我的计划是让读取器和写入器都将请求放入两个单独的多处理队列中,然后让第三个进程在循环中弹出这些请求并执行。
我的主要问题是我真的不知道如何正确地实现multiprocessing.queue,你不能真正为每个进程实例化对象,因为它们将是单独的队列,你如何确保所有进程都与一个共享队列(或者在本例中是队列)相关?
发布于 2019-07-22 14:07:57
下面是multiprocessing.Queue
和multiprocessing.Process
的一个非常简单的用法,它允许调用者将一个“事件”加上参数发送到一个单独的进程,该进程将事件分派到该进程上的"do_“方法。(Python 3.4+)
import multiprocessing as mp
import collections
Msg = collections.namedtuple('Msg', ['event', 'args'])
class BaseProcess(mp.Process):
"""A process backed by an internal queue for simple one-way message passing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = mp.Queue()
def send(self, event, *args):
"""Puts the event and args as a `Msg` on the queue
"""
msg = Msg(event, args)
self.queue.put(msg)
def dispatch(self, msg):
event, args = msg
handler = getattr(self, "do_%s" % event, None)
if not handler:
raise NotImplementedError("Process has no handler for [%s]" % event)
handler(*args)
def run(self):
while True:
msg = self.queue.get()
self.dispatch(msg)
用法:
class MyProcess(BaseProcess):
def do_helloworld(self, arg1, arg2):
print(arg1, arg2)
if __name__ == "__main__":
process = MyProcess()
process.start()
process.send('helloworld', 'hello', 'world')
send
发生在父进程中,do_*
发生在子进程中。
我省略了任何明显会中断run循环并退出子进程的异常处理。您还可以通过覆盖run
来控制阻塞或其他任何东西来自定义它。
这真的只在只有一个工作进程的情况下才有用,但我认为这是对这个问题的一个相关的回答,它演示了一个更多面向对象的常见场景。
发布于 2020-08-04 01:20:51
我在尝试建立一种使用队列传递大型pandas数据帧进行多处理的方法时,查看了堆栈溢出和web上的多个答案。在我看来,每一个答案都是在重复同样的解决方案,而没有考虑到在设置这样的计算时肯定会遇到的大量边缘情况。问题是同时有很多事情在起作用。任务数量、工作进程数量、每个任务的持续时间以及任务执行过程中可能出现的异常。所有这些都使得同步变得棘手,而且大多数答案都没有解决如何进行同步。这是我花了几个小时的时间写的,希望这篇文章足够通用,让大多数人觉得它有用。
在任何编码示例之前的一些想法。由于queue.Empty
或queue.qsize()
或任何其它类似方法对于流控制是不可靠的,所以类似的任何代码
while True:
try:
task = pending_queue.get_nowait()
except queue.Empty:
break
都是假的。即使毫秒后队列中出现另一个任务,这也会杀死工作进程。工人将不会恢复,一段时间后,所有的工人将消失,因为他们随机发现队列暂时为空。最终结果是,主多处理函数(在进程上使用join()的函数)将在所有任务都未完成的情况下返回。好的。如果您有数千个任务,并且缺少一些任务,那么祝您顺利完成调试。
另一个问题是标记值的使用。许多人建议在队列中添加一个标记值来标记队列的末尾。但是把它标记给谁呢?如果有N个工作进程,假设N是可用的给予或接受核心的数量,那么单个标记值将仅向一个工作进程标记队列的结束。当没有剩余的工作时,所有其他的工人都会坐着等待更多的工作。我见过的典型例子是
while True:
task = pending_queue.get()
if task == SOME_SENTINEL_VALUE:
break
一个工作者将获得前哨数值,而其余的工作者将无限期等待。我遇到的任何帖子都没有提到,您需要至少向队列提交与您的工作进程一样多的前端值,以便所有人都能获得它。
另一个问题是在任务执行期间的异常处理。同样,这些问题应该被捕获和管理。此外,如果您有一个completed_tasks
队列,那么在决定作业完成之前,您应该以确定性的方式独立计算队列中有多少项。同样,依赖队列大小肯定会失败,并返回意想不到的结果。
在下面的示例中,par_proc()
函数将接收一个任务列表,其中包括应用于执行这些任务的函数以及任何命名的参数和值。
import multiprocessing as mp
import dill as pickle
import queue
import time
import psutil
SENTINEL = None
def do_work(tasks_pending, tasks_completed):
# Get the current worker's name
worker_name = mp.current_process().name
while True:
try:
task = tasks_pending.get_nowait()
except queue.Empty:
print(worker_name + ' found an empty queue. Sleeping for a while before checking again...')
time.sleep(0.01)
else:
try:
if task == SENTINEL:
print(worker_name + ' no more work left to be done. Exiting...')
break
print(worker_name + ' received some work... ')
time_start = time.perf_counter()
work_func = pickle.loads(task['func'])
result = work_func(**task['task'])
tasks_completed.put({work_func.__name__: result})
time_end = time.perf_counter() - time_start
print(worker_name + ' done in {} seconds'.format(round(time_end, 5)))
except Exception as e:
print(worker_name + ' task failed. ' + str(e))
tasks_completed.put({work_func.__name__: None})
def par_proc(job_list, num_cpus=None):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Use as many workers as there are cores (usually chokes the system so better use less)
num_workers = num_cpus
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
for p in processes:
p.join()
return results
下面是运行上述代码的测试
def test_parallel_processing():
def heavy_duty1(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert job1 == 15
assert job2 == 21
另外还有一个例外
def test_parallel_processing_exceptions():
def heavy_duty1_raises(arg1, arg2, arg3):
raise ValueError('Exception raised')
return arg1 + arg2 + arg3
def heavy_duty2(arg1, arg2, arg3):
return arg1 * arg2 * arg3
task_list = [
{'func': heavy_duty1_raises, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
{'func': heavy_duty2, 'tasks': [{'arg1': 1, 'arg2': 2, 'arg3': 3}, {'arg1': 1, 'arg2': 3, 'arg3': 5}]},
]
results = par_proc(task_list)
job1 = sum([y for x in results if 'heavy_duty1' in x.keys() for y in list(x.values())])
job2 = sum([y for x in results if 'heavy_duty2' in x.keys() for y in list(x.values())])
assert not job1
assert job2 == 21
希望这能对你有所帮助。
发布于 2018-02-03 09:11:45
在"from queue import Queue
“中没有名为queue
的模块,而应该使用multiprocessing
。因此,它应该看起来像"from multiprocessing import Queue
“。
https://stackoverflow.com/questions/11515944
复制相似问题