我用芹菜3.1.x做两个任务。第一个任务(TaskOne)是在芹菜通过celeryd_after_setup信号启动时排队的:
@celeryd_after_setup.connect
def celeryd_after_setup(*args, **kwargs):
TaskOne().apply_async(countdown=5)运行TaskOne时,它会进行一些计算,然后对TaskTwo进行排队。想象一下以下工作流:
所以我们在队列中有两个TaskTwo。这对我的工作流来说是个问题,因为我只希望队列中有一个TaskTwo,并且避免第二个被排队。
我的问题是:我怎样才能做到这一点?
有了celery.app.control.Inspect.scheduled() (文档),我就可以得到一个任务的列表,它隐藏在列表和dicts的组合中。这也许是一种方式,但通过这一结果并不是正确的。还有更好的办法吗?
发布于 2015-05-28 15:58:08
在考虑了几个选项之后,我选择了使用app.control.inspect。这不是一个很好的解决方案,但它有效:
# fetch all scheduled tasks
scheduled_tasks = inspect().scheduled()
# iterate the scheduled task values, see http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#dump-of-scheduled-eta-tasks
for task_values in iter(scheduled_tasks.values()):
# task_values is a list of dicts
for task in task_values:
if task['request']['name'] == '{}.{}'.format(TaskTwo.__module__, TaskTwo.__name__):
logger.info('TaskTwo is already scheduled, skipping additional run')
return发布于 2015-05-28 06:09:25
一个易于实现的解决方案是将-清洗开关添加到worker命令中。它将清除队列,并且工作人员将在没有计划作业的情况下启动。
但要小心:这是一种全球性的、不可挽回的行动。当有其他您所依赖的计划作业时,这不是您的解决方案。
https://stackoverflow.com/questions/30491998
复制相似问题