首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python Celery:如何乱序连接任务结果?

Python Celery:如何乱序连接任务结果?
EN

Stack Overflow用户
提问于 2019-02-21 04:16:17
回答 1查看 310关注 0票数 0

我有一个简单的项目,在这个项目中,我创建了一大堆彼此不相关的工作,创建任务,将它们传递给Redis,并让许多工作人员分散在Docker Swarm中,通过长时间运行的任务队列进行处理。当工作进程完成时,他们将其完成的工作转储到NFS共享中,并将一个文本值发送回Celery客户机。

我在异步结果对象的结果集数组上使用celery.result.ResultSet的.join()函数。join()包含一个回调函数,它(目前)只是打印结果。

我的问题是join()阻塞,直到它按照给定的顺序接收每个asyncresult值。我的swarm由许多完全不同的主机组成,对我来说,重要的是让结果在完成时返回,而不是按顺序或一次完成。

有没有办法通过芹菜在任务完成时正确触发回调函数?我在网上看了很多例子,似乎我唯一的选择就是试试asyncio,但Python并不是我的强项。

创建tasks和ResultSet obj的函数:

代码语言:javascript
运行
复制
def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}

for task in encodeTasks:
    try:
        ret = encode.delay(task)
        r.add(ret)
        logging.debug("Task ID: " + str(ret.task_id))
        taskHandles[ret.task_id] = ret 
    except:
        logging.info("populateQueue fail: " + str(task.traceback))

logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r

main()的一部分,它等待结果:

代码语言:javascript
运行
复制
        frameCountTotal = getFrameCount(targetFile)
        encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
        taskHandles, retSet = populateQueue(encodeTasks)

        logging.info("Waiting on tasks...")
        retSet.join(callback=testCallback)

提前感谢

EN

Stack Overflow用户

发布于 2019-02-26 08:35:25

找到了我自己问题的答案:

ResultSet有另一个名为join_native()的方法,我认为只要代理是几个已知产品之一(RabbitMQ、Redis等),它就会使用对代理的更具体的API调用。Celery的文档只是说,如果您满足代理要求,它会提供更好的性能。文档没有提到的是,它允许无序返回(至少在Redis上,还没有尝试过RMQ)。

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54794568

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档