我正在使用芹菜构建一个用于后台任务的flask应用程序。我的应用程序使用在docker容器中运行的本地堆栈为我的消息代理在本地模拟SQS。我已经让flask和celery在本地运行,以便正确地使用localstack,我可以看到flask接收一个请求,向SQS队列添加一条消息,然后celery获得该任务并执行它。
我尝试将flask和celery与localstack一起停靠,我的所有服务都按预期运行,除了celery worker,它不执行队列中的任务。我可以在本地启动一个芹菜工人,它将读取队列并执行任务,但docker芹菜工人不会拉出任何任务。
在flask容器中运行celery work
我们有一个大型应用程序,它使用django作为ORM,使用芹菜作为运行基础设施的任务。我们运行由事件(用户驱动的或自动的)触发的复杂管道,这些事件如下所示:
def pipeline_a:
# all lines are synchronous, so second line must happen after first is finished successfully
first_res = a1()
all_results = in_parallel.do(a2, a3, a4)
a5(first_res, all_results)
我们希望运行a1,a2,.在不同的机器上(每个任务可能
我用的是芹菜3.X和RabbitMQ后端。有时需要重新启动芹菜(将新的源代码更新推到服务器上)。但是有一个具有大循环的任务,并在循环中尝试/捕获;完成任务可能需要几个小时。如果我将停止它并在以后重新启动它,那么没有什么关键的事情会发生。
问题:问题是每次我停止工作(通过sudo service celeryd停止)之后,我必须手动地(通过-9)杀死任务;任务忽略了来自worker的SIGTERM。我读过“扔芹菜文档”& Stackoverflow,但我找不到有效的解决方案。有什么办法解决这个问题吗?
我试图使用芹菜定期查询外部api,并使用新数据更新Django项目中的数据库。芹菜正确地安排任务并发送给芹菜工人,但是它从不执行任何事情。
下面是我的celery.py文件,它与我的settings.py处于相同的级别:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
os.environ.setdefault(