前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python celery 模块

python celery 模块

作者头像
py3study
发布2020-01-09 15:48:50
1.1K0
发布2020-01-09 15:48:50
举报
文章被收录于专栏:python3

Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度

Celery是典型的生产生-消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果) 1.编写任务代码task.py from celery import Celery

app = Celery('tasks',broker='amqp://guest@localhost//', backend='redis://localhost:6379/0')

@app.task def add(x, y): return x + y

当函数使用”@app.task”修饰后,即为可被Celery调度的任务 2.启动workers 命令 celery worker -A tasks --loglevel=info --concurrency=5 3.调用任务

result=add.delay(2, 5) result.ready() result.get(timeout=1)

4.配置文件 单个参数配置: app.conf.CELERY_BROKER_URL = 'amqp://guest@localhost//' app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' 多个参数配置: app.conf.update( CELERY_BROKER_URL = 'amqp://guest@localhost//', CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' )

从配置文件中获取:

先把配置存入配置文件中'celeryconfig.py'

BROKER_URL='amqp://guest@localhost//' CELERY_RESULT_BACKEND='redis://localhost:6379/0'

导入到celery 对象中app.config_from_object('celeryconfig') 我们之前调用任务使用了”delay()”方法,它其实是对”apply_async()”方法的封装, 使得你只要传入任务所需的参数即可 关于序列化 Celery默认序列化方式是”json”,指定序列化 app = Celery('tasks', broker='...', task_serializer='yaml')

app.conf.update( CELERY_TASK_SERIALIZER='pickle', CELERY_RESULT_SERIALIZER='json', )

@app.task def add(x, y): ...

add.apply_async((2, 5), serializer='json')


django + celery 实现任务的异步处理 1.Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 -- http handling(request解析) -- url mapping(url正则匹配找到对应的View) -- 在View中进行逻辑的处理、数据计算(包括调用Model类进行数据库的增删改查)--将数据推送到template,返回对应的template/response 同步请求:所有逻辑处理、数据计算任务在View中处理完毕后返回response。在View处理任务时用户处于等待状态,直到页面返回结果 异步请求:View中先返回response,再在后台处理任务。用户无需等待,可以继续浏览网站。当任务处理完成时,我们可以再告知用户 2.建立消息队列 消息队列可以使用RabbitMQ、Redis 等 3.安装django-celery pip install celery django-celery 4.配置settings.py import djcelery djcelery.setup_loader() BROKER_URL = 'django://' # 使用django做broker CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务. CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # 需要跟踪任务的状态时保存结果和状态 CELERY_ENABLE_UTC = False # 不用UTC. CELERY_TIMEZONE = 'Asia/Shanghai' # 指定上海时区 CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] # 允许的格式 CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_IGNORE_RESULT = True

INSTALLED_APPS = [ 'djcelery',# 新增 'kombu.transport.django', # 新增kombu.transport.django则是基于Django的broker ]

其中,当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task 5.在项目 mysite 下新建celery.py from future import absolute_import import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') from django.conf import settings # noqa app = Celery('mysite') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))

6.在应用celery_project下新建tasks.py

from future import absolute_import from celery import shared_task import time

@shared_task(track_started=True) def add(x, y): time.sleep(30) return x + y

在tasks.py中我们就可以编码实现我们需要执行的任务逻辑,在开始处import task,然后在要执行的任务方法开头用上装饰器@task。需要注意的是,与一般的.py中实现celery不同,tasks.py必须建在各app的根目录下,且不能随意命名 6.生产任务 在需要执行该任务的View中,通过test.delay的方式来创建任务,并送入消息队列 def produce(): a =1 b =2 r = test.delay(a,b) 7.启动work #先启动服务器 python manage.py runserver #再启动worker celery worker -A mysite -c 4 --loglevel=info

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/08/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis®
腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档