前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python实现定时任务的几种方法

Python实现定时任务的几种方法

作者头像
Steve Wang
发布2021-12-20 18:46:31
3.3K0
发布2021-12-20 18:46:31
举报
文章被收录于专栏:从流域到海域从流域到海域

死循环内sleep (轮询,阻塞)

在一个死循环内放入执行的任务,sleep一段时间之后再执行:

代码语言:javascript
复制
import time

def func():
    print("func executed")

def timer():
    while True:
        func()
        time.sleep(5)

timer()

# 每隔5s执行一次func函数

该方法能够执行固定间隔时间的任务,时间间隔由time.sleep()的睡眠时间指定。

在循环内加入时间判断的逻辑也可以执行定时任务,比如:

代码语言:javascript
复制
import datetime
date = datetime.datetime.now().strftime("%Y-%m-%d %H:%m")
if date == '2021-12-19 17:12':
    func()

但由于轮询的原因,能指定的时间粒度会比较粗。

因为sleep()是阻塞函数,该方法始终有一个缺点是等待的时间内什么都不能做。

threading模块的Timer

threading模块提供了一个定时器触发的函数Timer是一个非阻塞函数,但只能等待固定的时间,然后执行一次任务。

代码语言:javascript
复制
import datetime
from threading import Timer

def func(para):
    print(para)
    print("func exexuted")

def timer(delay):
    hit = "timer called func"
    t = Timer(delay, func, (hit,))  # 三个参数分别是:延迟时间 调用函数 (传入调用函数的参数(必须是tuple))
    t.start()

timer(3.0)

实现非阻塞是通过另起一个线程实现的,这并不令人惊讶。

sche模块

Python内置了专用于调度的模块sche,它提供延时调度的机制,每次定时执行任务都必须写入一个调度。

代码语言:javascript
复制
import sched
scheduler = sched.scheduler(time.time, time.sleep) # 两个参数分别为 返回时间戳的函数,默认就是time.time 和 定时未到达之前执行何种函数阻塞,默认就是time.sleep

def func(para):
    print(para)
    print("func executed")

def timer(delay):
    hit = "scheduled func"
    scheduler.enter(delay, 0, func, (hit,)) # 四个参数分别为 延迟时间 优先级(如果调度包含多个任务,需指定优先级) 调用函数 传入参数
    scheduler.run()

timer(3)

主线程依然会阻塞,而且调度只生效一次,如果想再次执行,必须再次调用enter方法。

基于APScheduler实现定时任务

Python这个语言的优势就在于有丰富的第三方库,既然原生实现有这样那样的缺点,我们可以借助第三方库来实现定时任务。

APScheduler,即Advanced Python Scheduler的缩写,是一个简单易用的python定时框架。

APScheduler是一个强大的Python定时框架,它提供了基于日期、固定时间间隔、以及类Linux crontab类型的定时任务,并支持持久化和以daemon方式执行任务。

第三方框架需要安装:

代码语言:javascript
复制
pip install apscheduler

APScheduler包含四个组件:触发器(trigger)作业存储(job store)执行器(executor)和调度器(scheduler)

触发器(trigger)

触发器包含调度逻辑,每一个设定好的定时任务都有自己的触发器,用于决定下一次作业执行的时间。除了初始配置之外,触发器完全是无状态的。

APScheduler提供三种内置的触发器,这三种触发器也决定了定时任务的三种出发方式:

  • date: 特定时间点触发
  • interval: 间隔固定时间触发
  • cron: 以linux cron方式周期性触发。该方式最为强大,也能够实现前两种触发方式

作业存储(job store)

存储被调度的作业,默认的存储方式是存储在内存中(APScheduler默认使用MemoryJobStore),也支持保存在各种数据库中。一个作业的数据将在持久化时被序列化,并且被加载时被反序列化。调度器不能共享同一个作业的存储。

执行器(executor)

实际处理作业的运行,通常在作业中提交一个指定的函数到一个线程池或者进程池内运行。作业完成时,执行器将会通知调度器。 常用的executor有两种:

  • ProcessPoolExecutor
  • ThreadPoolExecutor

调度器(scheduler)

一般我们在一个应用内只使用一个调度器,开发者通常不会直接处理作业存储、调度器和触发器,调度器提供了能配置这些的接口。配置作业存储和执行器可以在调度器中完成,它添加、修改和移除作业等常见操作。

调度器的配置

APSchedule提供了配置字典和参数关键字两种配置调度器的方式,使用的时候既可以先创建调度器再添加和配置作业,也可以先以字典形式指定配置,然后再创建调度器。除此之外之外,还可以通过装饰器的方式直接将一个函数变成调度的作业。

下面提供几个不同的实例,来说明不同的调度器配置方法。

先创建调度器再添加和配置作业

BlockingScheduler和BackgroundScheduler

APScheduler支持两种调度器,一种是阻塞主线程的BlockingScheduler,另一种是在后台运行的BackgroundScheduler。 BlockingScheduler适用于定时任务是唯一要执行的任务,BackgroundScheduler适用于定时任务不是唯一的任务,主线程还有其他任务要执行,因此将定时任务作为后台任务来执行。

代码语言:javascript
复制
from apscheduler.schedulers.blocking import BlockingScheduler  # 阻塞方法

def job(para):
	print(parm)
    print("job executed")

scheduler = BlockingScheduler()
scheduler.add_job(job, ['定时器任务'], 'interval', seconds=5)  # 第一个参数指定要执行的函数 第二个参数指定传入的参数 第二个参数指定调度方式 也可以是'date'或者'cron' 第三个参数是'interval'间隔调度模式下间隔的时间 也可以指定hours或者minutes
scheduler.start()
代码语言:javascript
复制
from apscheduler.schedulers.background import BackgroundScheduler  # 非阻塞方法

def job(para):
	print(parm)
    print("job executed")

scheduler = BackgroundScheduler()
scheduler.add_job(job, ['定时器任务'], 'interval', seconds=5)
scheduler.start()

使用字典方式配置调度器

其实就是以字典的形式指定参数,看一下例子就明白了。

代码语言:javascript
复制
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# MongoDB 参数
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
# 输出时间
def job():
    print("jon executed")

# 存储方式配置
jobstores = {
    'mongo': MongoDBJobStore(collection='job', database='test', client=client),
    'default': MemoryJobStore()
}
# 执行器配置
executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(job, 'interval', seconds=5, jobstore='mongo')
scheduler.start()

装饰器方式配置调度器

代码语言:javascript
复制
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
# 直接在被调度函数前加装饰器
@scheduler.scheduled_job('interval', id='job_id', seconds=5)
def job_function():
    print("Hello World")

scheduler.start()

调度方式

interval方式

间隔调度,即每隔一段固定的时间就执行一次。

上面的两个例子即为间隔调度的方式,除此之外,间隔调度还可以使用的参数包括:

  • weeks (int) – number of weeks to wait
  • days (int) – number of days to wait
  • hours (int) – number of hours to wait
  • minutes (int) – number of minutes to wait
  • seconds (int) – number of seconds to wait
  • start_date (datetime|str) – starting point for the interval calculation
  • end_date (datetime|str) – latest possible date/time to trigger on
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations

start_dateend_date还可以指定任务开始日期和结束日期,timezone参数虽然博文的例子都没有使用,但是建议使用,例子如下:

代码语言:javascript
复制
from apscheduler.schedulers.background import BackgroundScheduler  # 非阻塞方法

def job(para):
	print(parm)
    print("job executed")

scheduler = BackgroundScheduler()
scheduler.add_job(job, ['定时器任务'], 'interval', hours=5, timezone='asia/shanghai')
scheduler.start()

除了interval调度方式,还有date方式和cron方式。

date方式

最基本的方式,作业只会执行一次

date方式接收两个参数:

  • run_date指定运行时间,可以精确到秒
代码语言:javascript
复制
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler

def job(para):
    print(para)
    
scheduler = BlockingScheduler()
# The job will be executed on December 19th, 2021
scheduler.add_job(job, 'date', run_date=date(2021, 12, 19), args=['定时器任务'])
scheduler.add_job(job, 'date', run_date=datetime(2021, 12, 19, 18, 39, 5), args=['定时器任务']) # 年月日 时分秒
scheduler.add_job(job, 'date', run_date='2021-12-19 18:39:05', args=['定时器任务']) # 上面的另一种形式

# The 'date' trigger and datetime.now() as run_date are implicit
# 上面的英文的意思是date触发器是默认的触发器,此时此刻是默认指定的运行时间 所以可以省略 指定的任务将会立即执行
scheduler.add_job(job, args=['text'])
scheduler.start()
cron方式*(最重要)

cron方式类似于Linux的crontab命令,最为强大,也能够实现前两种调度方式。

Linux的crontab命令使用时需要指定6个参数:

代码语言:javascript
复制
crontab t1 t2 t3 t4 t5 job

其中 t1 是表示分钟,t2 表示小时,t3 表示一个月份中的第几日,t4 表示月份,t5 表示一个星期中的第几天。job表示要执行的程序。 当t1为时表示每分钟都要执行 program,t2 为时表示每小时都要执行程序,其余类推。 当t1为a-b时表示从第 a 分钟到第 b 分钟这段时间内要执行,t2 为 a-b 时表示从第 a 到第 b 小时都要执行,其余类推。 当t1为*/n时表示每 n 分钟个时间间隔执行一次,t2为 */n 表示每 n 小时个时间间隔执行一次,其余类推。 当t1为a, b, c,… 时表示第 a, b, c,… 分钟要执行,t2为 a, b, c,…时表示第 a, b, c…个小时要执行,其余类推。

代码语言:javascript
复制
*    *    *    *    *
-    -    -    -    -
|    |    |    |    |
|    |    |    |    +----- 星期中星期几 (0 - 6) (星期天 为0)
|    |    |    +---------- 月份 (1 - 12) 
|    |    +--------------- 一个月中的第几天 (1 - 31)
|    +-------------------- 小时 (0 - 23)
+------------------------- 分钟 (0 - 59)

完整说明可以参见:Linux crontab 命令

APScheduler则进行了简化,使用如下参数指定cron类型的任务:

  • year (int|str) – 4-digit year
  • month (int|str) – month (1-12)
  • day (int|str) – day of the (1-31) – week (int|str) – ISO week (1-53)
  • day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
  • hour (int|str) – hour (0-23)
  • minute (int|str) – minute (0-59)
  • second (int|str) – second (0-59)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)

注意,为了和date做区分,时间参数全部不加s。

Expression

Field

Description

*

any

Fire on every value

*/a

any

Fire every a values, starting from the minimum

a-b

any

Fire on any value within the a-b range (a must be smaller than b)

a-b/c

any

Fire every c values within the a-b range

xth y

day

Fire on the x -th occurrence of weekday y within the month

last x

day

Fire on the last occurrence of weekday x within the month

last

day

Fire on the last day within the month

x,y,z

any

Fire on any matching expression; can combine any number of any of the above expressions

一个使用实例:

代码语言:javascript
复制
from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print("Hello World")
    
# BlockingScheduler
scheduler = BlockingScheduler()
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
scheduler.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2021-12-30 00:00:00
scheduler.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2021-12-30')
scheduler.start()

job操作

添加job
  • add_job()
  • scheduled_job()

第二种方法只适用于应用运行期间不会改变的 job,而第一种方法返回一个apscheduler.job.Job 的实例,可以更改或者移除job。

移除job
  • job.remove() 使用 add_job() 返回的实例移除作业
  • remove_job() remove_job使用 jobID移除作业
代码语言:javascript
复制
job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# use id
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
暂停和恢复job
代码语言:javascript
复制
# 暂停
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
# 恢复
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
获取 job 列表
  • 使用get_jobs() 获得可调度 job 列表,它会返回所有的job实例。
  • 使用print_jobs() 来输出所有格式化的job列表。
修改 job

除了 jobID 之外 job 的所有属性都可以修改,可以使用apscheduler.job.Job.modify()或者modify_job()修改一个job的属性:

代码语言:javascript
复制
job.modify(max_instances=6, name='Alternated name')
modify_job('my_job_id', trigger='cron', minute='*/5')
关闭job

默认情况下调度器会等待所有的 job 完成后,关闭所有的调度器和作业存储。将wait选项设置为False可以立即关闭。

代码语言:javascript
复制
scheduler.shutdown()
scheduler.shutdown(wait=False)

scheduler事件

scheduler可以添加事件监听器,在被监听事件发生时触发。

代码语言:javascript
复制
def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
# 添加监听器
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

参考文献

Python 实现定时任务 Python定时库APScheduler原理及用法

(部分参考文献已在文中列出)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-12-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 死循环内sleep (轮询,阻塞)
  • threading模块的Timer
  • sche模块
  • 基于APScheduler实现定时任务
    • 触发器(trigger)
      • 作业存储(job store)
        • 执行器(executor)
          • 调度器(scheduler)
          • 调度器的配置
            • 先创建调度器再添加和配置作业
              • BlockingScheduler和BackgroundScheduler
            • 使用字典方式配置调度器
              • 装饰器方式配置调度器
                • 调度方式
                  • job操作
                    • scheduler事件
                    • 参考文献
                相关产品与服务
                云数据库 MongoDB
                腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档