前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >解密python实现定时任务的8种方式

解密python实现定时任务的8种方式

作者头像
雷子
发布2023-12-20 15:08:34
6750
发布2023-12-20 15:08:34
举报
文章被收录于专栏:雷子说测试开发

定时任务是编程中常见的需求,它可以按照预定的时间表执行特定的任务或操作。在Python中,有多种方法可以实现定时任务。

常见的Python定时任务解决方案,包括使用标准库、第三方库和操作系统工具。

方案一:

使用time.sleep()

time.sleep()是Python标准库中的函数,它可以帮助你暂停程序的执行一段指定的时间。

通过组合time.sleep()和循环,可以实现简单的定时任务。

代码语言:javascript
复制
import time

def task():
    print("定时任务执行中...")

while True:
    task()
    time.sleep(60)  # 休眠1min

结果:

代码语言:javascript
复制
定时任务执行中...
定时任务执行中...
定时任务执行中...

方案二:使用sched模块

Python的sched模块提供了一个事件调度器,可以在指定的时间执行函数。这是一个更灵活的解决方案,可以安排多个任务。

代码语言:javascript
复制
import time
import sched
s = sched.scheduler(time.time, time.sleep)
def task(sc):
    print("定时任务执行中...")
s.enter(3600, 1, task, (s,))
s.run()

结果:

sched 对象主要方法:

enter(delay, priority, action, argument),安排一个事件来延迟 delay 个时间单位。

cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个 ValueError。

run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的 delayfunc() 函数),然后执行事件,直到不再有预定的事件。

方案三:使用APScheduler库

APScheduler是一个功能强大的Python库,可用于调度各种类型的任务,包括定时任务。它支持多种调度方式,如间隔调度、定时调度等。

代码语言:javascript
复制
from apscheduler.schedulers.blocking import BlockingScheduler

def task():
    print("定时任务执行中...")

scheduler = BlockingScheduler()
scheduler.add_job(task, 'interval', hours=1)
scheduler.start()

方案四:使用Celery

Celery是一个强大的分布式任务队列框架,可以用于管理和调度定时任务。它支持任务的异步执行和分布式部署。

代码语言:javascript
复制
from celery import Celery

app = Celery('myapp', broker='pyamqp://guest@localhost//')

@app.task
def my_task():
    print("定时任务执行中...")

my_task.apply_async(countdown=3600)

方案五:使用操作系统工具

在Linux和Unix系统上,可以使用cron任务调度器来执行定时任务。通过编写一个包含Python命令的脚本,并将其添加到cron作业中,可以在指定的时间执行Python脚本。

创建一个Python脚本(例如my_task.py):

代码语言:javascript
复制
# my_task.py
def my_task():
    print("定时任务执行中...")

if __name__ == "__main__":
    my_task()

然后,将脚本添加到cron作业

在cron配置文件中添加:

代码语言:javascript
复制
0 * * * * /usr/bin/python /path/to/my_task.py

方案六:使用第三方库schedule

schedule是一个Python库,它提供了一种非常简单的方式来执行定时任务。它的语法直观易懂,适用于各种定时任务需求。

代码语言:javascript
复制
import schedule
import time
def mtask():
    print("定时任务执行中...")
schedule.every(1).hour.do(mtask)
while True:
    schedule.run_pending()
    time.sleep(1)

方案七:利用 threading.Timer 实现定时任务

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer 最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

Tmer(interval, function, args=[ ], kwargs={ })

interval: 指定的时间

function: 要执行的方法

args/kwargs: 方法的参数

代码语言:javascript
复制
import datetime
from threading import  Timer
def do():
    now = datetime.datetime.now()
    print(now)
    loop()
def loop():
    t = Timer(5, do)
    t.start()
if __name__ == "__main__":
     loop()

方案八、使用 Timeloop 库运行定时任务

是一个库,可用于运行多周期任务。这是一个简单的库,它使用 decorator 模式在线程中运行标记函数

代码语言:javascript
复制
import time
from timeloop import Timeloop
from datetime import timedelta
tl=Timeloop()
@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print("2s job current time : {}".format(time.ctime()))
if __name__=="__main__":
    tl.start(block=True)

八个定时任务的实现方式就实现完成了!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-12-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 雷子说测试开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档