前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python测试开发django-197.django-celery-beat 定时任务

python测试开发django-197.django-celery-beat 定时任务

作者头像
上海-悠悠
发布2022-07-19 12:46:55
5690
发布2022-07-19 12:46:55
举报

前言

django-celery-beat 可以支持定时任务,把定时任务写到数据库。 接着前面这篇写python测试开发django-196.python3.8+django2+celery5.2.7环境准备 django-celery-beat 一般结合 django-celery-results一起使用

环境准备

运行系统:linux(centos/debian/ubuntu),不支持windows Python版本:3.8.5 Django : 2.2.2 celery: 5.2.7 django-celery-results==2.4.0 django-celery-beat==2.3.0

使用pip安装celery5.2.7版本

代码语言:javascript
复制
pip install celery==5.2.7

安装django-celery-results库:

代码语言:javascript
复制
pip install django-celery-results==2.4.0

安装django-celery-beat

代码语言:javascript
复制
pip install django-celery-beat==2.3.0

定时任务配置

Django 项目中settings.py:

代码语言:javascript
复制
INSTALLED_APPS = (
    ...,
    'django_celery_results',
    'django_celery_beat',
)

将django_celery_beat模块和django-celery-results相关配置,写到setting.py

代码语言:javascript
复制
# #  RabbitMQ配置BROKER_URL 和backend
CELERY_BROKER_URL = 'amqp://admin:123456@127.0.0.1:5672//'
# # RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'django-db'

# # SCHEDULER 定时任务保存数据库
# 将任务调度器设为DatabaseScheduler
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
# CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
# CELERY_RESULT_EXPIRES = xx
CELERY_TASK_RESULT_EXPIRES = 60*60*24  # 后端存储的任务超过一天时,自动删除数据库中的任务数据,单位秒
CELERY_MAX_TASKS_PER_CHILD = 1000  # 每个worker执行1000次任务后,自动重启worker,防止任务占用太多内存导致内存泄漏

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TIMEZONE = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False

应用Django数据库迁移,创建相关的表

代码语言:javascript
复制
python manage.py migrate django_celery_results
python manage.py migrate django_celery_beat

django_celery_results生成3张表

django_celery_beat生成6张表

相关表的说明 django_celery_beat.models.ClockedSchedule # 此模型存放已经关闭的任务 django_celery_beat.models.CrontabSchedule  # cron的时间表  django_celery_beat.models.IntervalSchedule # 以特定间隔(例如,每5秒)运行的计划。 django_celery_beat.models.PeriodicTask # 此模型定义要运行的单个周期性任务。 django_celery_beat.models.PeriodicTasks # 此模型仅用作索引以跟踪计划何时更改 django_celery_beat.models.SolarSchedule # 定制任务

实例参考

在views视图中,主要用到CrontabSchedule 和PeriodicTask

代码语言:javascript
复制
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json

class CreateTaskView(APIView):
    """创建任务视图"""

    def post(self, request):
        """创建任务 接口传参,示例
        {
           "task_name": "任务名称",
           "arg1": "参数1",
           "arg2": "参数2",
           "task_cron": "*/2 * * * *"
        }
        """
        task_name = request.data.get("task_name")
        task_kwargs = {
            "task_name": task_name,
            "arg1": request.data.get("arg1"),
            "arg2": request.data.get("arg2"),
        }
        # 定时任务规则
        cron_value = request.data.get("task_cron")
        cro_list = str(cron_value).split(' ')
        if len(cro_list) != 5:
            return Response({"code": 3003, "msg": "task_cron 不合法"})
        cron_time = {
            'minute': cro_list[0],  # 每2分钟执行一次
            'hour': cro_list[1],
            'day_of_week': cro_list[2],
            'day_of_month': cro_list[3],
            'month_of_year': cro_list[4]
        }
        # 写入 schedule表
        schedule = CrontabSchedule.objects.create(**cron_time)
        # 任务和 schedule 关联
        task_obj = PeriodicTask.objects.filter(name=task_name)
        if task_obj:
            return JsonResponse({"code": 3000, "msg": "task name exist"})

        task_obj, created = PeriodicTask.objects.get_or_create(
            name=task_name,  # 名称保持唯一
            task="app.tasks.task_demo",  # 任务的注册路径
            crontab=schedule,
            enabled=True,  # 是否开启任务
            # args=json.dumps(task_args),
            kwargs=json.dumps(task_kwargs),
            # 任务过期时间,设置当前时间往后1天
            expires=datetime.datetime.now() + datetime.timedelta(days=1)
        )
        if created:
            return JsonResponse({"code": 0, "msg": "success"})
        else:
            return JsonResponse({"code": 111, "msg": "create failed"})

其中task="app.tasks.task_demo" 是任务的注册路径,比如django下有自己的app,在app下创建了一个tasks.py文件

代码语言:javascript
复制
# Create your tasks here
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

启动worker与beat

最后启动worker与beat

代码语言:javascript
复制
celery -A proj worker -l info
celery -A proj beat -l info

linux 后台启动使用 supervisord 后台启动celery 服务(worker/beat) 相关教程参考这篇https://www.cnblogs.com/yoyoketang/p/16458925.html

2022年第 11 期《python接口web自动化+测试开发》课程,6月5号开学

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从零开始学自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 环境准备
  • 定时任务配置
  • 实例参考
  • 启动worker与beat
相关产品与服务
数据传输服务
腾讯云数据传输服务(Data Transfer Service,DTS)可帮助用户在业务不停服的前提下轻松完成数据库迁移上云,利用实时同步通道轻松构建高可用的数据库多活架构,通过数据订阅来满足商业数据挖掘、业务异步解耦等场景需求。同时,DTS 还提供私有化独立输出版本 DTS-DBbridge,支持异构数据库和同构数据库之间迁移和同步,可以帮助企业实现完整数据库迁移(如 Oracle)。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档