前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Celery实践二】在Flask项目中使用Celery

【Celery实践二】在Flask项目中使用Celery

作者头像
TesterHome小助手
发布2022-01-11 10:42:27
1.2K0
发布2022-01-11 10:42:27
举报
文章被收录于专栏:用户9355284的专栏

背景

上篇我们介绍了Celery的环境搭建以及基础入门,这篇主要分享如何在Python+Flask项目中使用。

步骤

1、新建flask项目,目录结构如下

图片
图片

Common目录下存放model层做数据库关系映射以及公共方法

Config目录下存放项目配置以及celery配置

Controllers目录下存放业务控制方法以及注册路由

Tasks目录下存放异步任务方法

具体代码如下:

  • Celery_settings.py
代码语言:javascript
复制
# celery配置CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区CELERY_ENABLE_UTC = False  # 禁用UTC,配合CELERY_TIMEZONE使用BROKER_URL = "amqp://yyyyy:xxxxxxxxxx@192.168.a.bb:5672/"  # broker地址CELERY_RESULT_BACKEND = "yyyyy://:xxxxxxxxxx@192.168.3.53:6379/0"  # result地址CELERY_ROUTES = {    'run_api_job_delay': {'queue':'job1'},    'run_ui_job_delay': {'queue':'job2'},}# 不同任务队列配置
  • Settings.py
代码语言:javascript
复制
#公用配置DEBUG = TrueSQLALCHEMY_ECHO = FalseDB_HOST="192.168.a.bb"DB_USER="root"DB_PASSWORD="xxxxxxxxxx"SQLALCHEMY_DATABASE_URI="mysql+pymysql://"+DB_USER+":"+DB_PASSWORD+"@"+DB_HOST+":3306/rpa"SQLALCHEMY_TRACK_MODIFICATIONS = TrueSECRET_KEY = "xxxxxxxxx"CORS_ALLOW_CREDENTIALS = TrueCORS_ORIGIN_ALLOW_ALL = TrueCSRF_ENABLED = True
  • Run_job.py
代码语言:javascript
复制
from flask importBlueprintfrom flask import jsonifyfrom flask_restful import reqparsefrom tasks.tasks import run_job_delay
runJob_page = Blueprint("runJob_page", __name__)
# 执行/调试场景测试@runJob_page.route('/run_job', methods=['POST'])#指定路由def run_job():    parser = reqparse.RequestParser()    parser.add_argument('job_id',type=int)    args = parser.parse_args()    job_id = args.get('job_id')    _save_run(job_id)    return jsonify({'msg':"ok", "remark": "任务开始执行"})    def _save_run(job_id):    run_job_delay.delay(job_id)
  • Tasks.py
代码语言:javascript
复制
from application importceleryfrom celery.utils.log import get_task_logger
logger = get_task_logger(__name__)#日志输出@celery.task(name='run_api_job_delay')def run_api_job_delay(job_id):    print('执行异步任务')
  • Application.py
代码语言:javascript
复制
from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom flask_cors import *from celery import Celeryfrom config import celery_settingfrom flask_httpauth import HTTPBasicAuth
app = Flask(__name__)#实例化应用对象celery = Celery(app.name)# 创建celery实例celery.config_from_object(celery_setting)#读取celery配置CORS(app, supports_credentials=True)app.config.from_pyfile("config\\settings.py")db = SQLAlchemy(app)auth = HTTPBasicAuth()
  • manager.py
代码语言:javascript
复制
from application importapp, managerfrom flask_script import Commandfrom www import *from gevent import pywsgi
# create_table@Commanddef create_all():    from application import db    db.create_all()    manager.add_command("create_all", create_all)
if __name__ == "__main__":    # 测试    app.run(host='0.0.0.0', debug=True,threaded=True, port=8888)    # 生产    # server =pywsgi.WSGIServer(('192.168.a.bb', 5000), app)    # server.serve_forever()

2、创建worker项目

配置项和server项目相同

Controllers/runJob.py

代码语言:javascript
复制
celery =Celery('worker', broker=settings.BROKER_URL, backend=settings.RESULT_BACKEND)#实例化对象
@celery.task(name='run_job_delay')def run_job_delay(job_id):   Run_job(job_Id)

3、启动server项目

代码语言:javascript
复制
python manager.py

4、启动worker项目

Q参数可以指定监听队列

代码语言:javascript
复制
celery worker -A worker -l info -P eventlet -Q job1 

5、工作流简述

    请求run_job接口,通过url映射到对应view函数;view函数执行业务处理后推送异步方法到指定队列;worker监听指定队列中消息并消费,将结果保存;

    如果平台是综合多种类型的自动化任务并且需要指定worker消费的话,流转应该是下图这样。

    例如worker1部署接口自动化执行服务,worker2部署UI自动化执行服务。

图片
图片

最后

整体来讲Celery使用上手难度 ★★☆☆☆,容易出问题的地方一般在启动时:worker 以及 -A 后边路径,下篇分享如何使用Celery实现动态定时任务的配置。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档