前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python定时器APScheduler

Python定时器APScheduler

作者头像
沈宥
发布2022-05-10 13:55:39
1.1K0
发布2022-05-10 13:55:39
举报
文章被收录于专栏:从头开始学习测试开发

简介:APScheduler是python的一个定时任务调度框架,能实现类似linux下crontab类型的任务,使用起来比较方便。它提供基于固定时间间隔、日期以及crontab配置类似的任务调度。

一、调度方法

安装: pip install apscheduler

1、BackgroundScheduler调度器

调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时使用

2、BlockingScheduler调度器

调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行

二、举个例子

代码语言:javascript
复制
from apscheduler.schedulers.background import BackgroundScheduler

aps = BackgroundScheduler()

def RunMonitor():
    # 第一个参数为目标函数,第二个为内置的一个名称,seconds为执行的间隔
    aps.add_job(RunCaseEnv, 'interval', seconds=40 * 60, args=['monitor'])

    # 启动线程任务
    aps.start()

def RunCaseEnv(env):
    print('*****************************',env)

def StopMonitor():
    ## 结束进程
    aps.shutdown(wait=False)
    aps.remove_all_jobs()

以BackgroundScheduler调度为例 1、新增一个定时任务:每隔40分钟执行一次RunCaseEnv函数,传参为args=['monitor'] 但由于是BackgroundScheduler调度方式,所以每次定时任务并不会立即执行,而是等到40分钟后开始执行

2、结束定时任务 默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果不想等待,可以使用wait=False

3、暂停和重启定时任务 暂停任务:

代码语言:javascript
复制
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

恢复任务:

代码语言:javascript
复制
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

三、django_apscheduler

在使用Django框架开发web项目时,通过前端页面灵活设置定时活动的框架,使用方法与APScheduler相同 安装: pip install django-apscheduler

先在settings.py文件的INSTALLED_APPS中加入django-apscheduler应用

迁移数据库表 python manage.py migrate

views.py文件中添加开启监控的方法

代码语言:javascript
复制
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_job

scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), 'default')


# 创建任务
def send_email_adjec(request):
    '''
    :param request:
    :return: 手动发送邮件
    '''

    # result = sendEmail.caseOverview(True)
    # sendEmail.SendDingTalk(result['sum'], result['fail'])

    # 创建任务
    # jobtest是要定时执行的任务
    scheduler.add_job(jobtest, 'interval', seconds=10, args='s')

    scheduler.start()

    return HttpResponseRedirect('/ZDH/android/')

# 具体要执行的代码
def jobtest(s):
    print('jobtest')
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 从头开始学习测试开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、调度方法
    • 1、BackgroundScheduler调度器
      • 2、BlockingScheduler调度器
      • 二、举个例子
      • 三、django_apscheduler
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档