前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python项目50-Celery框架

Python项目50-Celery框架

作者头像
DriverZeng
发布2022-09-26 13:52:40
3770
发布2022-09-26 13:52:40
举报
文章被收录于专栏:Linux云计算及前后端开发

-曾老湿, 江湖人称曾老大。


-多年互联网运维工作经验,曾负责过大规模集群架构自动化运维管理工作。 -擅长Web集群架构与自动化运维,曾负责国内某大型金融公司运维工作。 -devops项目经理兼DBA。 -开发过一套自动化运维平台(功能如下): 1)整合了各个公有云API,自主创建云主机。 2)ELK自动化收集日志功能。 3)Saltstack自动化运维统一配置管理工具。 4)Git、Jenkins自动化代码上线及自动化测试平台。 5)堡垒机,连接Linux、Windows平台及日志审计。 6)SQL执行及审批流程。 7)慢查询日志分析web界面。


Celery框架介绍


什么是Celery?

代码语言:javascript
复制
"""
1、celery框架自带socket,所以自身是一个独立运行的服务
2、启动celery服务,是来执行服务中的任务的,服务中带一个执行任务的对象,会执行准备就绪的任务,将执行任务的结果保存起来
3、celery框架由三部分组成:存放要执行的任务broker,执行任务的对象worker,存放任务结果的backend
4、安装的celery主体模块,默认只提供worker,要结合其他技术提供broker和backend(两个存储的单位)
"""

官方

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/


Celery架构

Celery的架构由三部分组成,消息中间件(message broker)、任务执行单元(worker)和 任务执行结果存储(backend - task result store)组成。

消息中间件

Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。

任务执行单元

Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

任务结果存储

Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。

Celery使用场景


Celery架构

异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

定时任务:定时执行某件事情,比如每天数据统计

Celery安装配置以及基础用法


安装

代码语言:javascript
复制
(luffy) bash-3.2$ pip install celery

目录结构

代码语言:javascript
复制
project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包文件
    │   ├── celery.py   # celery连接和配置相关文件,且名字必须交celery.py
    │   └── tasks.py    # 所有任务函数
    ├── add_task.py     # 添加任务
    └── get_result.py   # 获取结果

创建目录

luffyapi/scripts/celery框架/简单使用/celery_task/celery.py

celery框架是目录 简单使用时目录 celery_task 是包,包中必须创建有个celery.py文件

然后创建任务,任务文件可以随意.

celery_task/task1.py

代码语言:javascript
复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def add(n1, n2):
    print('运算数', n1, n2)
    print('运算结果:%s' % (n1 + n2))
    return n1 + n2

celery_task/task2.py

代码语言:javascript
复制
from .celery import app


@app.task
def low(n1, n2):
    print('减法:%s' % (n1 - n2))
    return n1 - n2

celery_task/celery.py

代码语言:javascript
复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/6'     #任务仓库
backend = 'redis://10.0.0.51:6379/8'    #结果仓库
include = ['celery_task.task1', 'celery_task.task2']    #任务,完成需求的函数所在文件
app = Celery(broker=broker, backend=backend, include=include)

Celery启动

代码语言:javascript
复制
# 非windows
# 命令:celery worker -A celery_task -l info
# windows:
# pip3 install eventlet
# celery worker -A celery_task -l info -P eventlet
代码语言:javascript
复制
cd /Users/driverzeng/Desktop/luffy/luffyapi/scripts/celery框架/简单使用

(luffy) bash-3.2$ celery worker -A celery_task -l info

Celery添加任务


手动添加立即任务

celery框架/简单使用/添加celery任务的脚本.py

代码语言:javascript
复制
from celery_task import task1, task2

## 使用模块中的函数,和celery没有任何关系
res = task1.add(10, 15)
print(res)
res2 = task2.low(10, 15)
print(res2)

上面的操作和celery一点关系都没有,redis一点数据都不会写入

如何与celery建立关系呢?

代码语言:javascript
复制
from celery_task import task1, task2

## 使用模块中的函数,和celery没有任何关系
# res = task1.add(10, 15)
# print(res)
# res2 = task2.low(10, 15)
# print(res2)

# 调用celery框架的方法,完成任务的添加
## 手动添加立即任务,调用delay就相当于将add交给celery进行调用,delay的参数与add的保持一致
res = task1.add.delay(100, 150)
print(res)


手动添加延迟任务

celery框架/简单使用/添加celery任务的脚本.py

代码语言:javascript
复制
from celery_task import task1, task2

## 使用模块中的函数,和celery没有任何关系
# res = task1.add(10, 15)
# print(res)
# res2 = task2.low(10, 15)
# print(res2)

# 调用celery框架的方法,完成任务的添加
## 手动添加立即任务,调用delay就相当于将add交给celery进行调用,delay的参数与add的保持一致
# res = task1.add.delay(100, 150)
# print(res)


## 手动添加延迟任务
from datetime import datetime, timedelta


def eta_second(second):
    ctime = datetime.now()
    utc_time = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_time + time_delay


## args就是执行add函数所需参数,eta就是延迟执行的时间
res = task1.add.apply_async(args=(200, 50), eta=eta_second(10))
print(res)

Celery获取任务


获取任务脚本

celery框架/简单使用/获取任务结果的脚本.py

代码语言:javascript
复制
from celery_task.celery import app

from celery.result import AsyncResult

id = 'e4439b36-d200-4551-8307-899018ebaffb'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任务失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

Celery高级使用


自动添加任务

celery框架/高级使用/celery_task/celery.py

代码语言:javascript
复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
from datetime import timedelta

app.conf.beat_schedule = {
    # 任务名可以随意写
    'jump_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.jump',
        ## 延迟时间:每三秒一次
        'schedule': timedelta(seconds=3),
        ## 任务函数传递的参数
        'args': (300, 150),
    }
}

celery框架/高级使用/celery_task/tasks.py

代码语言:javascript
复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
    print('积:%s' % (n1 * n2))
    return n1 * n2


@app.task
def full(n1, n2):
    print('商:%s' % (n1 // n2))
    return n1 // n2

先启动worker,再启动beat

代码语言:javascript
复制
## 进入task的上级目录
cd /Users/driverzeng/Desktop/luffy/luffyapi/scripts/celery框架/高级使用

## 启动worker
(luffy) bash-3.2$ celery worker -A celery_task -l info

## 启动beat
(luffy) bash-3.2$ celery beat -A celery_task -l info

可以看到3秒会执行一次任务


数据库相关需要配置时区

tasks.py

代码语言:javascript
复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
    print('积:%s' % (n1 * n2))
    return n1 * n2


@app.task
def full(n1, n2):
    print('商:%s' % (n1 // n2))
    return n1 // n2

celery.py

代码语言:javascript
复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta

app.conf.beat_schedule = {
    # 任务名可以随意写
    'jump_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.jump',
        ## 延迟时间
        'schedule': timedelta(seconds=3),
        ## 任务函数传递的参数
        'args': (300, 150),
    }
}

运维最熟悉的crontab定时任务

celery.py

代码语言:javascript
复制
from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    # 任务名可以随意写
    'jump_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.jump',
        ## 延迟时间: 每3秒一次
        'schedule': timedelta(seconds=3),
        ## 任务函数传递的参数
        'args': (300, 150),
    },
    'full_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.full',
        ## 每周一早上8点
        'schedule': crontab(hour=14,minute=34, day_of_week=0),
        ## 任务函数传递的参数
        'args': (30, 15),
    }
}

tasks.py

代码语言:javascript
复制
from .celery import app


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def jump(n1, n2):
    print('积:%s' % (n1 * n2))
    return n1 * n2


@app.task
def full(n1, n2):
    print('商:%s' % (n1 // n2))
    return n1 // n2

Celery实战-更新接口缓存


修改celery_task

首先将celery_task目录移动到项目根目录下

luffyapi/celery_task/tasks.py

代码语言:javascript
复制
from .celery import app
from home.models import Banner
from home.serializers import BannerModelSerializer
from django.core.cache import cache


# 一个任务就是一个函数,任务的执行结果就是函数的返回值
@app.task
def update_banner_list():
    # 获取最新内容
    banner_query = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')
    # 序列化
    banner_data = BannerModelSerializer(banner_query, many=True).data
    for banner in banner_data:
        banner['image'] = 'http://127.0.0.1:8000' + banner['image']
    # 更新缓存
    cache.set('banner_list', banner_data)
    return True

luffyapi/celery_task/celery.py

代码语言:javascript
复制
import os,django

os.environ.setdefault("DJANGO_SETTINGS_MODULE",'luffyapi.settings.dev')
django.setup()

from celery import Celery

# 通过Celery功能产生一个Celery应用
broker = 'redis://10.0.0.51:6379/7'
backend = 'redis://10.0.0.51:6379/9'
include = ['celery_task.tasks']
app = Celery(broker=broker, backend=backend, include=include)

## 启动worker:celery worker -A celery_task -l info

## 启动beat:celery  beat -A celery_task -l info
## beat也是一个socket,启动后会根据配置文件,自动添加任务(定时任务)

## app的配置文件
# 时区
app.conf.timezone = 'Asia/Shanghai'

# 是否使用UTC
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab

app.conf.beat_schedule = {
    # 任务名可以随意写
    'update_banner_list_task': {
        ## 指定任务源
        'task': 'celery_task.tasks.update_banner_list',
        ## 延迟时间: 每3秒一次
        'schedule': timedelta(seconds=10),
        ## 任务函数传递的参数
        # 'args': (300, 150),
    },
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-04-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Celery框架介绍
  • Celery使用场景
  • Celery安装配置以及基础用法
  • Celery添加任务
  • Celery获取任务
  • Celery高级使用
  • Celery实战-更新接口缓存
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档