我正尝试在我的Django应用程序中集成测试一个并发的芹菜任务。我想让任务在pytest集成测试期间在一个worker上并发运行,但我在执行这项工作时遇到了问题。 假设我有以下基本的芹菜任务: @shared_task
def sleep_task(secs):
print(f"Sleeping for {secs} seconds...")
for i in range(secs):
time.sleep(i)
print(f"\t{i + 1}")
return "DONE" 在脚本中
我有一个使用芹菜的烧瓶应用程序,当应用程序在本地运行时,异步处理工作得很好。但是,当我尝试测试(pytest)使用芹菜任务的路由时,我会得到以下错误:
app/bp_dir/routes.py:12: in <module>
from app import db, celery_tasks
app/celery_tasks.py:13: in <module>
@celery.task()
E AttributeError: 'NoneType' object has no attribute 'task'
当我在测试
我试图通过在pytest套件上运行.delay命令来测试任务:
def test_selected_rows_are_blocked(celery_worker):
id, select_only = 0, ['a', 'b']
tasks.update_table.delay(id, select_only)
# some code here that should be called as soon as task gets started
因此,任务可以运行,但是后面的代码可以同步运行。celery_worker工具使用我
__init__.py
from .celery import app as celery_app
__all__ = ['celery_app']
celery.py
from __future__ import absolute_import
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
# set the default Django settings module for the 'celery
我是刚接触celery,可能做错了什么,但我已经花了很多时间来弄清楚如何正确地配置celery。
因此,在我的环境中,我有两个远程服务器;一个是主服务器(它有公共IP地址,大多数东西,比如数据库服务器、rabbitmq服务器和运行我的web应用程序的web服务器都在那里),另一个用于特定的任务,我想使用celery从主服务器异步调用这些任务。
我计划使用RabbitMQ作为代理和结果后端。Celery配置非常基础:
CELERY_IMPORTS = ("main.tasks", )
BROKER_HOST = "Public IP of my main server&
我跟着达格在气流中奔跑,
当执行以上dag时,它将串行运行以下顺序之一。
A -> B -> C1 -> C2 -> D1 -> D2
A -> B -> C2 -> C1 -> D2 -> D1
但我的要求是同时并行运行C1和C2任务。我airflow.cfg的一部分
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = Cele
我正在使用芹菜处理Django应用程序中的异步任务。大多数任务都很短,几秒钟内就能完成,但我有一项任务可能需要几个小时。
由于服务器上的处理限制,芹菜被配置为一次只运行两个任务。这意味着,如果有人启动两个长期运行的任务,它会有效地封锁所有其他芹菜加工站点长达几个小时,这是非常糟糕的。
是否有任何方法配置芹菜,使其一次只处理一种任务?类似于:
@task(max_running_instances=1)
def my_really_long_task():
for i in range(1000000000):
time.sleep(6000)
注意,我不想取消my_re