前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python:定时任务模块schedul

python:定时任务模块schedul

作者头像
py3study
发布2020-01-22 12:13:53
1.2K0
发布2020-01-22 12:13:53
举报
文章被收录于专栏:python3python3

1.安装

代码语言:javascript
复制
pip install schedule

2.文档

代码语言:javascript
复制
https://schedule.readthedocs.io/en/stable/faq.html#how-to-execute-jobs-in-parallel

3.官网使用demo

代码语言:javascript
复制
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

4.我的schedule使用demo

代码语言:javascript
复制
import sys
import time
import schedule
import os
import logging
if not os.path.exists('/var/log/video_download/'):
    os.makedirs('/var/log/video_download')
log = logging.getLogger()
log.setLevel(logging.DEBUG)
fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s",
                        "%Y-%m-%d %H:%M:%S")
stream_handler = logging.FileHandler(
    '/var/log/video_download/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time()))))
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(fmt)
log.addHandler(stream_handler)
def handler():
    print("this is handler")
def main():
    if len(sys.argv) != 2:
        print('python video_data.py hour')
        sys.exit()
    param = sys.argv[1]
    if param == 'hour':
        log.debug("enter main")
        schedule.every().day.at("00:00").do(handler)
        schedule.every().hour.do(handler)
        while True:
            schedule.run_pending()
            time.sleep(1)
    else:
        print("python video_data.py hour")
        sys.exit()


if __name__ == "__main__":
    main()

5.拓展:

  并行执行任务

  (1)默认情况下,schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型

代码语言:javascript
复制
import threading
import time
import schedule


def job():
    print("I'm running on thread %s" % threading.current_thread())


def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()


schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)


while 1:
    schedule.run_pending()
    time.sleep(1)

  (2)如果需要控制线程数,就需要用queue

代码语言:javascript
复制
import Queue
import time
import threading
import schedule


def job():
    print("I'm working")


def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()
        jobqueue.task_done()

jobqueue = Queue.Queue()

schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

  (3)抛出异常

代码语言:javascript
复制
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

  (4)只运行一次

代码语言:javascript
复制
def job_that_executes_once():
    # Do some work ...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

  (5)一次取消多个任务

代码语言:javascript
复制
def greet(name):
    print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

schedule.clear('daily-tasks')

  (6)在任务中加入日志功能

代码语言:javascript
复制
import functools
import time

import schedule


# This decorator can be applied to
def with_logging(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        print('LOG: Running job "%s"' % func.__name__)
        result = func(*args, **kwargs)
        print('LOG: Job "%s" completed' % func.__name__)
        return result
    return wrapper

@with_logging
def job():
    print('Hello, World.')

schedule.every(3).seconds.do(job)

while 1:
    schedule.run_pending()
    time.sleep(1)

  (7)随机开展工作

代码语言:javascript
复制
def my_job():
    # This job will execute every 5 to 10 seconds.
    print('Foo')

schedule.every(5).to(10).seconds.do(my_job)

  (8)传参给作业函数

代码语言:javascript
复制
def greet(name):
    print('Hello', name)

schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档