我有三个芹菜任务,分别运行在三个不同的服务器上。
我想设置一个工作流,如果发送推送通知失败,我应该尝试发送sms。如果发送短信失败,我应该发送电子邮件。
如果这3项任务及其代码库位于同一台服务器上,我将跟踪链式任务的示例并执行如下操作
from celery import chain
from tasks import send_push_notification, send_sms, send_email
import json
# some paylaod
payload = json.dumps({})
res = chain(
send_push_notification.subtask(payload),
send_sms.subtask(payload),
send_email.subtask(payload)
)()但是任务被保存在3台不同的服务器上!
我试过了
# 1
from celery import chain
from my_celery_app import app
res = chain(
app.send_task('tasks.send_push_notification', payload),
app.send_task('tasks.send_sms', payload),
app.send_task('tasks.send_email', payload)
)()
# Which fails because I am chaining tasks not subtasks和
# 2
from celery import chain, subtask
res = chain(
subtask('tasks.send_push_notification', payload),
subtask('tasks.send_sms', payload),
subtask('tasks.send_email', payload)
)()
# fails because I am not adding the tasks on the broker这是如何做到的呢?
更新:--我可以使用link而不是chain来完成它。
from celery import subtask
res = app.send_task(
'tasks.send_push_notification', (payload, ),
link=subtask(
'tasks.send_sms', (payload, ),
link=subtask(
'tasks.send_email', (payload, ),
)
)
)这里有很多鸟巢。而且,由于我实际上需要创建一个数据库驱动的工作流,因此以这种方式创建它将非常复杂。
发布于 2016-03-01 10:57:39
为什么不在你的任务中处理它,
def push_notification_task(payload):
if not send_push_notification(payload):
sms_notification_task.delay(payload)
def sms_notification_task(payload):
if not send_sms_notification(payload):
email_notification_task.delay(payload)
def email_notification_task(payload):
send_email_notification(payload)此外,chain将按照给定的顺序执行所有任务,而只有在第一次失败时才希望下一个任务运行。
https://stackoverflow.com/questions/35720951
复制相似问题