前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flask 学习-58.基于 Celery 的后台任务

Flask 学习-58.基于 Celery 的后台任务

作者头像
上海-悠悠
发布2022-09-13 15:45:47
1.3K0
发布2022-09-13 15:45:47
举报
文章被收录于专栏:从零开始学自动化测试

前言

如果应用有一个长时间运行的任务,如处理上传数据或者发送电子邮件,而你不想在 请求中等待任务结束,那么可以使用任务队列发送必须的数据给另一个进程。 这样就 可以在后台运行任务,立即返回请求。

Celery 环境

Celery 是一个独立的 Python 包。flask 结合 celery 使用不需要安装额外的包,使用 pip 安装:

代码语言:javascript
复制
> pip install celery

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。 可以使用的场景如:

  • 异步发邮件,这个时候 只需要提交任务给celery 就可以了.之后 由worker 进行发邮件的操作 .
  • 跑批接口的任务,需要耗时比较长,这个时候 也可以做成异步任务 .
  • 定时调度任务等

Celery 简介

Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。 该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?

看下图就很清楚了

celery 的5个角色

  • Task 就是任务,有异步任务(Async Task)和定时任务(Celery Beat)
  • Broker 中间人,接收生产者发来的消息即Task,将任务存入队列。任务的消费者是Worker。Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。
  • Worker 执行任务的单元,它实时监控消息队列,如果有任务就获取任务并执行它。
  • Beat 定时任务调度器,根据配置定时将任务发送给Broker。
  • Backend 用于存储任务的执行结果。

celery 详细文档参考前面这篇https://www.cnblogs.com/yoyoketang/p/15423517.html

Redis 安装

Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。那么需要先安装Redis之类的中间件

代码语言:javascript
复制
docker pull redis:latest
docker run -itd --name redis-test -p 6379:6379 redis

上面是没有设置密码的,设置密码用下面这句

代码语言:javascript
复制
docker run -itd --name myredis   -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes

pip 安装相关依赖包

代码语言:javascript
复制
pip install celery==5.2.7
pip install redis==2.10.6
pip install eventlet==0.33.1

flask + celery 基本配置

你首先需要有一个 Celery 实例,这个实例称为 celery 应用。其地位就相当于 Flask 中 Flask 一样。这个实例被用作所有 Celery 相关事务的 入口,如创建任务和管理工人,因此它必须可以被其他模块导入。

例如,你可以把它放在一个 tasks 模块中。这样不需要重新配置,你就可以使用 tasks 的子类,增加 Flask 应用情境的支持,并钩接 Flask 的配置。

只要如下这样就可以在 Falsk 中使用 Celery 了:

代码语言:javascript
复制
from celery import Celery

def make_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

这个函数创建了一个新的 Celery 对象,使用了应用配置中的 broker ,并从 Flask 配置中更新了 Celery 的其余配置。然后创建了一个任务子类,在一个应用情境中包 装了任务执行。

一个示例任务

让我们来写一个任务,该任务把两个数字相加并返回结果。我们配置 Celery 的 broker ,后端使用 Redis 。使用上文的工厂创建一个 celery 应用,并用它定 义任务。

代码语言:javascript
复制
from flask import Flask

flask_app = Flask(__name__)
flask_app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)

@celery.task()
def add_together(a, b):
    return a + b

这个任务现在可以在后台调用了:

代码语言:javascript
复制
result = add_together.delay(23, 42)
result.wait()  # 65

运行 Celery worker

至此,如果你已经按上文一步一步执行,你会失望地发现你的 .wait() 不会真正 返回。这是因为还需要运行一个 Celery worker来接收和执行任务。:

代码语言:javascript
复制
$ celery -A your_application.celery worker

把 your_application 字符串替换为你创建 celery 对像的应用包或模块。 现在worker 已经在运行中,一旦任务结束, wait 就会返回结果。

完整示例

代码语言:javascript
复制
from flask import Flask
from celery import Celery

# 基本配置
broker_url = 'redis://localhost:6379'
result_backend = 'redis://localhost:6379'

app = Flask(__name__)
celery_app = Celery(app.import_name,
                    broker=broker_url,
                    backend=result_backend)

@celery_app.task(name='demo/add')
def add(x, y):
    return x + y

@app.route('/add')
def index():
    results = add.delay(10, 20)
    print('---------10 + 20 ------------')
    return {"msg": "success", "result": results.wait()}

if __name__ == '__main__':
    app.run(debug=True)

先启动flask服务

代码语言:javascript
复制
flask run

启动celery worker服务

代码语言:javascript
复制
>celery -A app.celery_app worker -l info

需注意的是,celery 5.x的版本在windows上运行,还需要安装一个eventlet

代码语言:javascript
复制
pip install eventlet

最后这样启动celery worker 服务

代码语言:javascript
复制
celery -A app.celery_app worker -P eventlet -l info

启动后看到的日志

代码语言:javascript
复制
>celery -A app.celery_app worker -P eventlet -l info

-- ******* ---- Windows-10-10.0.17134-SP0 2022-09-08 10:36:58
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         app:0x259edced0d0

--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . demo/add

看到[tesks]有任务名称emo/add 说明添加成功 访问接口就可以看到运行结果

2022年第 12期《python接口web自动化+测试开发》课程,9月17号开学!

本期上课时间:2022年9月17号 - 2022年12月17号,周六周日上午9:00-11:00

报名费:报名费3000一人(周期3个月)

联系微信/QQ:283340479

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-09-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从零开始学自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Celery 环境
  • Celery 简介
  • Redis 安装
  • flask + celery 基本配置
  • 一个示例任务
  • 完整示例
    • 报名费:报名费3000一人(周期3个月)
      • 联系微信/QQ:283340479
      相关产品与服务
      云数据库 Redis®
      腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档