首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >python celery:如何将任务附加到旧链中

python celery:如何将任务附加到旧链中
EN

Stack Overflow用户
提问于 2013-10-16 17:55:42
回答 1查看 3.1K关注 0票数 4

我在我的数据库中保存了对链的引用。

代码语言:javascript
复制
from tasks import t1, t2, t3
from celery import chain
res = chain(t1.s(123)|t2.s()|t3.s())()
res.get()

如何将其他任务附加到此特定链?

代码语言:javascript
复制
res.append(t2.s())

我的目标是确保链按照我在代码中指定的顺序执行。如果我的链中有一个任务失败,下面的任务就不会被执行。

我在特定的队列中使用了超大型任务。

EN

回答 1

Stack Overflow用户

发布于 2013-10-16 19:44:13

所有信息都包含在消息中。

消息可能正在传输中,可能在世界的另一端,也可能被中间处理器使用。由于这个原因,消息在发送之后不可能被修改。

请参阅http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

我的目标是确保链按照我在代码中指定的顺序执行。如果我的链中有一个任务失败,下面的任务就不会被执行。

可以肯定的是,订单是作为消息的一部分发送的,如果任何任务失败,它将不会继续。

现在,如果您真的希望能够在运行时添加任务,那么您可以将信息存储在数据库中,让任务本身进行检查并调用新任务。不过,在执行此操作时会遇到一些挑战:

1)如果成功,链中的第一个任务将调用下一个任务,然后下一个任务将调用下一个任务,以此类推。

2)如果您在此流程中添加了一个任务,如果第一个任务已经执行,会发生什么情况?或者第二个,或者第三个?

因此,正如您可能猜测的那样,这将需要一些繁重的同步才能工作。

我想一个更简单的解决方案是创建一个等待一个任务完成的任务,然后应用一个回调:

代码语言:javascript
复制
from celery import subtask
from celery.result import from_serializable

@app.task(bind=True)
def after_task(self, result, callback, errback=None):
    result = from_serializable(result)
    if not result.ready():
        raise self.retry(countdown=1)
    if task.successful():
        subtask(callback).delay(result.get())
    else:
        if errback:
            subtask(errback)()


def add_to_chain(result, callback, errback=None):
    callback = callback.clone()     # do not modify caller
    new_result = callback.freeze()  # sets id for callback, returns AsyncResult
    new_result.parent = result
    after_task.delay(result.serializable(), callback, errback)
    return new_result

然后你可以像这样使用它:

代码语言:javascript
复制
from tasks import t1, t2, t3

res = (t1.s(123) | t2.s() | t3.s())()
res = add_to_chain(t2.s())

备注:

bind=True是即将到来的3.1版本中的新功能,对于旧版本,您必须删除self参数并使用current_task.retry (获取此from celery import current_task)。

Signature.freeze也是3.1中的新功能,在旧版本中可以使用:

代码语言:javascript
复制
from celery import uuid

def freeze(sig, _id=None):
    opts = sig.options
    try:
        tid = opts['task_id']
    except KeyError:
        tid = opts['task_id'] = _id or uuid()
    return sig.AsyncResult(tid)
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/19400305

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档