前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery 4 初体验及踩坑

Celery 4 初体验及踩坑

原创
作者头像
Ewdager
修改2020-07-30 14:32:16
1.2K0
修改2020-07-30 14:32:16
举报
文章被收录于专栏:Gvoidy备份小站Gvoidy备份小站

Celery是基于分布式消息传递的开源异步任务队列或作业队列。虽然它支持调度,但其重点是实时操作。现在4版本已经步入稳定,而国内互联网的几乎都是3版本的教程。所以这里记录下4版本下的踩坑及外文解决方案的翻译记录。

win环境运行celery 4 worke

Celery 是一个资金最少的项目,因此我们不支持 Microsoft Windows。请不要提出与该平台相关的任何问题。

官方在4版本移除了win平台支持,但是经过查阅,只要使用将并发模式-P改为gevent或者eventlet即可正常启动,但并不知道会有什么影响,毕竟官方已经不提供支持了,该启动方法仅适用于本地调试。

附上worker启动脚本

代码语言:txt
复制
# celery_worker_start.bat

@echo off

chcp 65001

CLS

echo 正在启动 python 虚拟环境

CALL venv\Scripts\activate.bat

echo 正在启动 celery

celery -A multi_analysis_tasks.celery_app worker -P gevent -l info

flask + celery 启动蓝图循环引用问题

项目结构

代码语言:txt
复制
def register_plugin(application):
    from app.models.base import db
    db.init_app(application)
    with application.app_context():
        db.create_all()


def create_app():
    application = Flask(__name__)
    CORS(application, supports_credentials=True)  # 设置允许跨域
    application.config.from_object('app.config.setting')
    application.config.from_object("app.config.secure")
    register_blueprint(application)
    register_plugin(application)
    return application

app = create_app()
celery_app = make_celery(app)

if __name__ == "__main__":

    app.run(host="0.0.0.0", port=5000, debug=False)

>>> ImportError: cannot import name 'create_blueprint_v1'

解决方案

celery worker 入口文件和 flask 启动的入口文件分开,worker 的入口文件不需要初始化蓝图,即可解决循环引用的问题。

在 celery work 中加入 flask 上下文

注意: celery worker 运行的必须是已经推入flask context的 celery 对象,后续推入的context是无效的。

代码语言:txt
复制
from celery import Celery
from app.config.setting import CELERY_TASKS_INCLUDE


def make_celery(app):
    celery = Celery(__name__, include=CELERY_TASKS_INCLUDE)
    celery.config_from_object('app.config.celery_conf')

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

成员函数实现 celery task 异步调用

由于支持方法太多bug且没有人修,celery官方在4版本,移除了celery 3 中的celery.contrib.methods方法

I wrote celery.contrib.task_methods as an experiment, but turns out there were some serious bugs there, for example task retries wouldn't work at all, and since nobody were stepping up to fix it I removed it.

所以尽量将需要异步调用的方法移动到类以外,如果实在是没有办法移动,可以将celery.contrib.methods放到项目内,然后实现调用。

代码语言:txt
复制
from celery import current_app

__all__ = ['task_method', 'task']


class task_method(object):

    def __init__(self, task, *args, **kwargs):
        self.task = task

    def __get__(self, obj, type=None):
        if obj is None:
            return self.task
        task = self.task.__class__()
        task.__self__ = obj
        return task


def task(*args, **kwargs):
    return current_app.task(*args, **dict(kwargs, filter=task_method))

使用方法:

代码语言:txt
复制
from celery import current_app
from celery.contrib.methods import task_method

class A:
@current_app.task(filter=task_method, name='A.foo')
def foo(self, bar):
    ...

参考

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • win环境运行celery 4 worke
  • flask + celery 启动蓝图循环引用问题
    • 项目结构
      • 解决方案
      • 在 celery work 中加入 flask 上下文
      • 成员函数实现 celery task 异步调用
      • 参考
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档