在这个项目中,我尝试轮询一个长期运行的任务的task.state并更新它的运行状态。它在开发中工作,但是当我将项目移动到生产服务器上时,它就不能工作了。我一直在“等待”,甚至我可以看到任务从花开始。但是,当任务完成时,我仍然可以得到更新的结果,这是在task.state ==的“成功”时进行的。我在生产中使用python2.6、Django 1.6和芹菜3.1,结果是后端AMQP。
@csrf_exempt
def poll_state(request):
data = 'Fail'
if request.is_ajax():
if &
我有个密码:
def post(self, request, *args, **kwargs):
file = request.FILES["import_file"]
# create a tast with celery and save ID of the task
task_id = importing.delay(file).id
return Response({"task_id": task_id}, content_type="application/json")
当类型(文件)是Temporary
我正在建设一个任务的网站,需要相当长的时间来处理,并需要大量的计算。我编写了代码,以便任务能够提供部分输出(产生结果)。我使用芹菜工人在另一台机器上运行任务,并设置使用update_state()发送更新的任务。
我的celery_tasks.py文件是这样的
@celery.task(name="my_task")
def my_task():
final_output = []
for output_part in long_process():
my_task.update_state(state="PROGRESS", m
如何正确运行这种芹菜任务? @app.task
def add(x)
x + 1
def some_func():
result = 'result'
for i in range(10):
task_id = uuid()
add.apply_async((i,)), task_id=task_id)
return result 我需要所有的任务被顺序执行后,前一个完成。我尝试使用time.sleep(),但在本例中,返回结果要等到所有任务完成后才能返回。但我需要返回结果,并且所有10个任务都在后台顺序运行
芹菜-底线:我想通过使用任务id获得任务名(我没有任务对象)
假设我有以下代码:
res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)
然后在其他地方:
task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
我知道,如果我有一个任务对象,我可以得到任
在我使用芹菜的Django项目中,我有一个芹菜任务,它将把文件上传到后台的数据库中。我使用轮询来跟踪上传进度,并显示一个上传进度条。下面是一些详细介绍上传过程的片段:
views.py:
from .tasks import upload_task
...
upload_task.delay(datapoints, user, description) # datapoints is a list of dictionaries, user and description are simple strings
tasks.py:
from taskman.celery import app,
我有多个dag使用芹菜执行程序,但我想使用Kubernetes Executor运行一个特定的dag。我无法推断出一个好的和可靠的方法来实现这一点。
我有一个声明要使用CeleryExecutor的airflow.cfg。我不想改变它,因为除了一个dags之外,所有的dags都需要它。
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
我的dag代码:
from da