我有一个简单的项目,在这个项目中,我创建了一大堆彼此不相关的工作,创建任务,将它们传递给Redis,并让许多工作人员分散在Docker Swarm中,通过长时间运行的任务队列进行处理。当工作进程完成时,他们将其完成的工作转储到NFS共享中,并将一个文本值发送回Celery客户机。
我在异步结果对象的结果集数组上使用celery.result.ResultSet的.join()函数。join()包含一个回调函数,它(目前)只是打印结果。
我的问题是join()阻塞,直到它按照给定的顺序接收每个asyncresult值。我的swarm由许多完全不同的主机组成,对我来说,重要的是让结果在完成时返回,而不是按顺序或一次完成。
有没有办法通过芹菜在任务完成时正确触发回调函数?我在网上看了很多例子,似乎我唯一的选择就是试试asyncio,但Python并不是我的强项。
创建tasks和ResultSet obj的函数:
def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}
for task in encodeTasks:
try:
ret = encode.delay(task)
r.add(ret)
logging.debug("Task ID: " + str(ret.task_id))
taskHandles[ret.task_id] = ret
except:
logging.info("populateQueue fail: " + str(task.traceback))
logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r
main()的一部分,它等待结果:
frameCountTotal = getFrameCount(targetFile)
encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
taskHandles, retSet = populateQueue(encodeTasks)
logging.info("Waiting on tasks...")
retSet.join(callback=testCallback)
提前感谢
发布于 2019-02-26 08:35:25
找到了我自己问题的答案:
ResultSet有另一个名为join_native()的方法,我认为只要代理是几个已知产品之一(RabbitMQ、Redis等),它就会使用对代理的更具体的API调用。Celery的文档只是说,如果您满足代理要求,它会提供更好的性能。文档没有提到的是,它允许无序返回(至少在Redis上,还没有尝试过RMQ)。
https://stackoverflow.com/questions/54794568
复制相似问题