异步任务队列Celery在Django中的应用
01 Django简介
关于Django的介绍,之前在2018年9月17号的文章中已经讲过了,大家有兴趣可以翻翻之前的文章,这里再简单介绍下:
Django是一个开放源代码的Web应用框架,由Python写成,它采用了MVC的框架模式,即模型(Model)M,视图(View)V和控制器(Controler)C。它最初是被开发来用于管理一些以新闻内容为主的网站的。在Django中,控制器接受用户输入的部分由框架自行处理,所以 Django 里更关注的是模型(Model)、模板(Template)和视图(Views),这里详细解释下MTV里面包含的具体内容:
模型(Model):定义数据库相关的内容,一般放在models.py文件中。
视图(View):定义HTML等静态网页文件相关,也就是那些html、css、js等前端的东西。
控制器(Controller):定义业务逻辑相关,就是你的主要代码。
Django的工作流程大致如下:
Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:
1.http请求发起
2.http handling(request解析)
3.url mapping(url正则匹配找到对应的View)
4.在View中进行逻辑的处理、数据计算(包括调用Model类进行数据库的增删改查)
5.将数据推送到template,返回对应的template/response
0
2
Celery简介
在搞清楚celery是什么玩意儿之前,我们需要首先搞懂两个概念,一个是同步请求,一个是异步请求.
所谓同步请求,就是所有逻辑处理都是在view中处理完毕后返回response,在view处理任务时,用户处于等待状态,举个栗子:我们点击一个页面,然后这个页面直接返回按钮点击的效果。
所谓异步请求,就是view中先返回一个response,再在后台处理相关任务,用户无需等待,可以继续浏览网站,当任务处理完成时,我们再告知用户。
而celery就是处理异步任务队列的一个分布式框架,支持使用任务队列的方式在分布的机器上执行任务调度。
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
Celery的架构组成如下图:
可以看到,Celery 主要包含以下几个模块:
03
Django中的Celery实现
上面那些都是纸上谈兵,接下来我们将进行一波实战演练,这个过程分为如下几个阶段:
1.建立消息队列(Broker)
官方给出的消息队列可选方案有redis,RabbitMQ以及其他消息中间件,这里我们使用redis作为消息中间件,具体安装步骤如下:
$ sudo pip install redis
然后进行简单的配置,只需要设置 Redis 数据库的位置,具体的配置位置后面会讲到,我们只需要知道URL的值要设置为:
BROKER_URL = 'redis://localhost:6379/0'
其中的localhost可以直接改为你的本地IP地址。
2.安装django-celery
安装django-celery的方法比较简单,直接运行下面的命令即可:
pip install celery
pip install django-celery
3.配置Django中的settings.py文件
每一个Django工程下面,都有一个settings文件,为了在Django中配置celery,必须对这个文件进行一定的配置,我这里配置的结果如下:
import os
import djcelery
#from celery import platforms
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
''' celery config '''
djcelery.setup_loader() #启动函数
BROKER_URL = 'redis://192.168.56.102:6379/3'# 使用Redis做broker
CELERY_RESULT_BACKEND = 'djcelery.backends.database.DatabaseBackend'# 需要跟踪任务的状态时保存结果和状态
CELERY_TASK_SERIALIZER = 'json'#任务序列化格式
CELERY_RESULT_SERIALIZER = 'pickle'#结果序列化格式
CELERY_ACCEPT_CONTENT = ['json', 'pickle']# 允许的格式
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'#定时任务
CELERY_TASK_RESULT_EXPIRES = * * #任务结果持久化保留天数
CELERYD_MAX_TASKS_PER_CHILD = #最大子任务数量
CELERY_TRACK_STARTED = True
CELERY_TIMEZONE='Asia/Shanghai' #时区
#platforms.C_FORCE_ROOT = True
REDSI_KWARGS_LPUSH = {"host":'192.168.56.102','port':,'db':}
REDSI_LPUSH_POOL = None
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = ['*']
# Application definition
#定义的app
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'OpsManage',
'rest_framework',
'djcelery',# 这里增加了djcelery 也就是为了在django admin里面可一直接配置和查看celery
'Restful',
'wiki',
'rest_framework.authtoken',
)
对于每一项参数,里面都有少量的解释,最后32行的installed_apps是指目前已经安装的app,配置好了这些参数,当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。BROKER_URL和CELERY_RESULT_BACKEND分别指代你的Broker的代理地址以及Backend(result store)数据存储地址。在Django中如果没有设置backend,会使用其默认的后台数据库用来存储数据。
4.在app的根目录下,简历task.py文件
在tasks.py中我们就可以编码实现我们需要执行的任务逻辑,在开始处import task,然后在要执行的任务方法开头用上装饰器@task。需要注意的是,与一般的.py中实现celery不同,tasks.py必须建在各app的根目录下,且不能随意命名。这里给出我的task.py的目录:
在这个tasks.py中写入我们想要实现的异步任务调度的方法,如下:
from celery import task
@shared_task
def add(x, y):
return x + y
上面描述的是最简单的task.py任务调度方法,这里给了一个add方法,返回两个数字的和,需要注意第一行,引入celery task
5.配置相关URL
在上述目录结构图中的url文件中,配置相关的url,让它调用一个方法,我配置的方法如下:
url(r'^test_celery/', task_manage.test_celery),
配置的是test_celery作为URL,task_manage中的test_celery方法调用我们刚才写好的add和product方法.
6.方法调用
配置好了URL之后,需要在方法文件中引入刚才创建的task方法,我这里引用的方法文件是task_manage文件,在其中写入以下内容:
from OpsManage.tasks import add
from OpsManage.tasks import product
def test_celery(request):
add.delay(,)
return JsonResponse({'msg': 'succeed' , "code": , 'data': 'you are so smart!!!'})
7.启动celery worker
启动worker的命令如下:
export C_FORCE_ROOT="true"
celery worker -A OpsManage -l debug
第一句的意思是强制在root用户下启动,celery默认的是在非root用户下启动,如果使用root,将会产生告警,由于我本地测试环境只有一个root用户,所以直接在root下面进行的.
8.flower工具配置
为了更加清楚的看懂搞个任务的执行状况以及各个worker的健康状态,并对这些状态进行监控,celery提供了一个工具flower,它将以上信息利用可视化的方法进行展现,它的配置方法如下:
1. 安装flower:
pip install flower
2. 启动flower(默认会启动一个webserver,端口为5555):
python manage.py celery flower
配置好的效果图如下(访问本地IP:5555端口即可):
可以看到,它包含了一些字段,这些字段的目前还没有值,是因为我们还没有启动我们的异步调度任务。
9.异步调度任务接入
异步调度任务接入也比较简单,我们访问以下我们刚才第5步配置的URL,就相当于调用了task_manage中的test_celery方法,而这个方法调用了我们的异步任务add和product,结果如下:
访问test_celery网页结果:
查看flower中的结果:
1.任务丢入任务队列,但是还未执行
2.任务丢入任务队列,已经执行
3.点击绿色链接,查看执行结果,可以看到,已经求出了9+9的和是18
今天只是初步让大家了解一下celery在Django中的配置和使用方法,后续还将详细描述一些更深层次的应用。