首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python任务调度程序

Python任务调度程序
EN

Code Review用户
提问于 2018-05-22 05:44:24
回答 1查看 4K关注 0票数 5

将任务提交到调度程序,允许在以后的某个时间运行它们。任务可以按任何顺序提交,不运行的任务也可以取消。

我使用一个min堆来确定下一个任务的优先级,使用一个threading.Condition在以下任务之间进行通信。

  • 一个“观察者”线程,在需要运行下一个任务之前睡觉,或者它的睡眠时间需要缩短(添加了新的“更早”任务)。
  • 主线程,它将写入min堆。

调度是O(log n),取消是O(n),最快的任务是O(1)

代码语言:javascript
运行
复制
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Task = namedtuple('Task', ['start', 'name', 'fn'])


class Scheduler(object):
    '''Class that schedules functions to be run in a separate thread at some future
    time. Supports cancellation of functions that haven't yet started.
    '''
    def __init__(self):
        self._cv = threading.Condition(threading.Lock())
        self._minheap = []
        self._timeout = None
        self._start()

    def cancel(self, name):
        with self._cv:
            try:
                task = [task for task in self._minheap if task.name == name][0]
            except IndexError:
                return
            self._minheap.remove(task)
            heapq.heapify(self._minheap)
            self._cv.notify()
        logger.info('canceled {}'.format(task.name))

    def schedule(self, name, fn, start):
        task = Task(start, name, fn)
        logger.info('scheduling task: {}'.format(name))
        with self._cv:
            heapq.heappush(self._minheap, task)
            self._cv.notify()
        logger.info('scheduled task: {}'.format(name))

    def _get_next_timeout(self):
        if not self._minheap:
            return None
        return (self._minheap[0].start - datetime.now()).total_seconds()

    def _start(self):
        def run():
            while True:
                self._cv.acquire()
                logger.info('waiting with timeout: {}'.format(self._timeout))
                not_expired = self._cv.wait(timeout=self._timeout)
                if self._timeout is None:
                    logger.info('no timeout found; using min element')
                    self._timeout = self._get_next_timeout()
                    self._cv.release()
                elif not_expired:
                    logger.info('already waiting but woken up; comparing current with min element')
                    self._timeout = min(self._timeout, self._get_next_timeout())
                    self._cv.release()
                else:
                    logger.info('timed out; running next task')
                    next_task = heapq.heappop(self._minheap)
                    self._timeout = self._get_next_timeout()
                    self._cv.release()
                    threading.Thread(target=next_task.fn, name=next_task.name).start()

        threading.Thread(target=run, name='timer').start()


def main():
    logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')

    start = datetime.now()

    def task():
        logger.info('running, elapsed: {}'.format((datetime.now() - start).total_seconds()))

    s = Scheduler()
    s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
    s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
    s.cancel('task-2')
    s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
    # note that task-4 precedes task-3, but is registered after task-3
    s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
    time.sleep(5)
    now = datetime.now()
    s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
    s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
    s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))


if __name__ == '__main__':
    main()

输出:

代码语言:javascript
运行
复制
❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer     : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer     : no timeout found; using min element
timer     : waiting with timeout: 0.999214
timer     : timed out; running next task
task-1    : running, elapsed: 1.006024
timer     : waiting with timeout: 1.494409
timer     : timed out; running next task
task-4    : running, elapsed: 2.506384
timer     : waiting with timeout: 0.49432
timer     : timed out; running next task
task-3    : running, elapsed: 3.005836
timer     : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer     : no timeout found; using min element
MainThread: scheduling task: task-6
timer     : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer     : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer     : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer     : already waiting but woken up; comparing current with min element
timer     : waiting with timeout: 3.498098
timer     : timed out; running next task
task-7    : running, elapsed: 8.509112
timer     : waiting with timeout: 0.493943
timer     : timed out; running next task
task-6    : running, elapsed: 9.008533
timer     : waiting with timeout: 0.994441
timer     : timed out; running next task
task-5    : running, elapsed: 10.005569
timer     : waiting with timeout: None

并发性很难,所以我很想听听人们的想法。谢谢!

EN

回答 1

Code Review用户

回答已采纳

发布于 2018-05-22 10:31:16

1.回顾

  1. 可以改进文档字符串。如果Scheduler类的docstring (简单地)解释了如何使用它,并且应该有用于schedulecancel方法的docstring,那就太好了。
  2. 当使用logging模块时,通常不需要自己格式化日志消息。相反,传递格式字符串和参数,并让记录器在需要时执行格式化(取决于日志级别,它可能永远不需要格式化特定的消息)。所以不是:logger.info(‘等待超时:{}'.format( self._timeout) )写:logger.info(“等待超时%f",self._timeout)
  3. 如果有多个具有相同名称的任务,则cancel方法将取消最接近堆开始的任务。这不一定是最快的启动时间(因为堆被安排为一棵树,而不是排序列表)。在我看来,这种行为很难理解,而且似乎会导致程序不可靠。我认为,如果您采用以下三种方法之一,用户编写可靠的程序就更容易了:(I)取消所有具有匹配名称的任务,使其行为不依赖于堆的排列;或者(Ii)要求队列中的所有任务都具有唯一的名称;或者(Iii)让schedule方法返回表示任务的某个对象,以便以后可以传递给cancel来唯一标识要取消的特定任务。
  4. 取消所需的时间与堆中的任务数成正比。这可以改进为(摊销) \$O(\log n)\$ (I)让schedule方法返回一些表示上述任务的对象;(Ii)将该任务对象传递给cancel方法;(Iii)将已取消的任务留在堆中,但将其标记为已取消的任务;(Iv)当已取消的任务从堆中弹出时,将其丢弃。
  5. Task的属性是startnamefn,但是schedule的参数是namefnstart。这种不一致可能导致混淆或错误。(您需要start作为第一位,这样任务就可以根据它们的开始时间进行比较,但是还有其他方法可以实现这一点。)
  6. 如果您尝试用相同的启动时间和相同的名称调度两个任务,那么您将得到TypeError: '<' not supported between instances of 'function' and 'function'
  7. 名称_minheap描述了数据结构,但更重要的是描述目的(数据结构是实现细节)。所以我会用一个像_tasks这样的名字。
  8. _timeout属性仅在run函数中使用,因此它可以是一个局部变量,而不是Scheduler对象上的一个属性。
  9. _get_next_timeout方法只在run函数中使用,因此它可以是本地函数,而不是Scheduler类上的方法。(只能在持有锁的情况下调用它,因此它不适合独立的方法。)
  10. _start方法仅在__init__方法中调用,因此可以在其中内联。
  11. 在我看来,run中的逻辑是错误的。下面的事件序列是可能的:(I)定时器线程调用_get_next_timeout,它返回10秒的超时。(Ii)定时器线程释放条件变量。(Iii)另一个线程使用应该在5秒内运行的任务调用schedule,并通知条件变量(但条件变量没有等待)。(4)定时器线程获取条件变量。(V)定时器线程等待10秒的旧超时。但是它应该计算出5秒的新超时时间,然后等待它。要解决这个问题,需要计算从获取条件变量到等待它之间的超时。
  12. 在条件变量上使用acquirerelease似乎有风险,因为在错误的时间出现异常将使锁保持不变。通常,人们使用上下文管理器接口来避免这种风险。我想您之所以没有这样做,是因为您想在不持有锁的情况下启动任务。但是,只需要对代码进行少量的重新排列就可以完成这项工作。

2.修改后的代码

这是一个带有匿名任务的版本,但是如果您希望您的任务具有名称,那么添加它们将是非常简单的。

代码语言:javascript
运行
复制
class Scheduler:
    """A schedule of tasks to be run in background threads. Call the
    schedule method to schedule a task to run at a particular time.
    Call the task's cancel method to cancel it if it has not already
    started running.

    """

    @functools.total_ordering
    class _Task:
        "A scheduled task."

        def __init__(self, fn, start):
            "Create task that will run fn at or after the datetime start."
            self.fn = fn
            self.start = start
            self.cancelled = False

        def __le__(self, other):
            # Tasks compare according to their start time.
            return self.start <= other.start

        @property
        def timeout(self):
            "Return time remaining in seconds before task should start."
            return (self.start - datetime.now()).total_seconds()

        def cancel(self):
            "Cancel task if it has not already started running."
            self.cancelled = True
            logger.info("canceled %s", self)

    def __init__(self):
        cv = self._cv = threading.Condition(threading.Lock())
        tasks = self._tasks = []

        def run():
            while True:
                with cv:
                    while True:
                        timeout = None
                        while tasks and tasks[0].cancelled:
                            heapq.heappop(tasks)
                        if tasks:
                            timeout = tasks[0].timeout
                            if timeout <= 0:
                                task = heapq.heappop(tasks)
                                break
                        cv.wait(timeout=timeout)
                logger.info("starting task %s", task)
                threading.Thread(target=task.fn).start()

        threading.Thread(target=run, name='Scheduler').start()

    def schedule(self, fn, start):
        """Schedule a task that will run fn at or after start (which must be a
        datetime object) and return an object representing that task.

        """
        task = self._Task(fn, start)
        logger.info("scheduling task %s", task)
        with self._cv:
            heapq.heappush(self._tasks, task)
            self._cv.notify()
        logger.info("scheduled task %s", task)
        return task
票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/194922

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档