将任务提交到调度程序,允许在以后的某个时间运行它们。任务可以按任何顺序提交,不运行的任务也可以取消。
我使用一个min堆来确定下一个任务的优先级,使用一个threading.Condition
在以下任务之间进行通信。
调度是O(log n)
,取消是O(n)
,最快的任务是O(1)
。
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
Task = namedtuple('Task', ['start', 'name', 'fn'])
class Scheduler(object):
'''Class that schedules functions to be run in a separate thread at some future
time. Supports cancellation of functions that haven't yet started.
'''
def __init__(self):
self._cv = threading.Condition(threading.Lock())
self._minheap = []
self._timeout = None
self._start()
def cancel(self, name):
with self._cv:
try:
task = [task for task in self._minheap if task.name == name][0]
except IndexError:
return
self._minheap.remove(task)
heapq.heapify(self._minheap)
self._cv.notify()
logger.info('canceled {}'.format(task.name))
def schedule(self, name, fn, start):
task = Task(start, name, fn)
logger.info('scheduling task: {}'.format(name))
with self._cv:
heapq.heappush(self._minheap, task)
self._cv.notify()
logger.info('scheduled task: {}'.format(name))
def _get_next_timeout(self):
if not self._minheap:
return None
return (self._minheap[0].start - datetime.now()).total_seconds()
def _start(self):
def run():
while True:
self._cv.acquire()
logger.info('waiting with timeout: {}'.format(self._timeout))
not_expired = self._cv.wait(timeout=self._timeout)
if self._timeout is None:
logger.info('no timeout found; using min element')
self._timeout = self._get_next_timeout()
self._cv.release()
elif not_expired:
logger.info('already waiting but woken up; comparing current with min element')
self._timeout = min(self._timeout, self._get_next_timeout())
self._cv.release()
else:
logger.info('timed out; running next task')
next_task = heapq.heappop(self._minheap)
self._timeout = self._get_next_timeout()
self._cv.release()
threading.Thread(target=next_task.fn, name=next_task.name).start()
threading.Thread(target=run, name='timer').start()
def main():
logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')
start = datetime.now()
def task():
logger.info('running, elapsed: {}'.format((datetime.now() - start).total_seconds()))
s = Scheduler()
s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
s.cancel('task-2')
s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
# note that task-4 precedes task-3, but is registered after task-3
s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
time.sleep(5)
now = datetime.now()
s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))
if __name__ == '__main__':
main()
输出:
❗ ~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer : no timeout found; using min element
timer : waiting with timeout: 0.999214
timer : timed out; running next task
task-1 : running, elapsed: 1.006024
timer : waiting with timeout: 1.494409
timer : timed out; running next task
task-4 : running, elapsed: 2.506384
timer : waiting with timeout: 0.49432
timer : timed out; running next task
task-3 : running, elapsed: 3.005836
timer : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer : no timeout found; using min element
MainThread: scheduling task: task-6
timer : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer : already waiting but woken up; comparing current with min element
timer : waiting with timeout: 3.498098
timer : timed out; running next task
task-7 : running, elapsed: 8.509112
timer : waiting with timeout: 0.493943
timer : timed out; running next task
task-6 : running, elapsed: 9.008533
timer : waiting with timeout: 0.994441
timer : timed out; running next task
task-5 : running, elapsed: 10.005569
timer : waiting with timeout: None
并发性很难,所以我很想听听人们的想法。谢谢!
发布于 2018-05-22 02:31:16
Scheduler
类的docstring (简单地)解释了如何使用它,并且应该有用于schedule
和cancel
方法的docstring,那就太好了。logging
模块时,通常不需要自己格式化日志消息。相反,传递格式字符串和参数,并让记录器在需要时执行格式化(取决于日志级别,它可能永远不需要格式化特定的消息)。所以不是:logger.info(‘等待超时:{}'.format( self._timeout) )写:logger.info(“等待超时%f",self._timeout)cancel
方法将取消最接近堆开始的任务。这不一定是最快的启动时间(因为堆被安排为一棵树,而不是排序列表)。在我看来,这种行为很难理解,而且似乎会导致程序不可靠。我认为,如果您采用以下三种方法之一,用户编写可靠的程序就更容易了:(I)取消所有具有匹配名称的任务,使其行为不依赖于堆的排列;或者(Ii)要求队列中的所有任务都具有唯一的名称;或者(Iii)让schedule
方法返回表示任务的某个对象,以便以后可以传递给cancel
来唯一标识要取消的特定任务。schedule
方法返回一些表示上述任务的对象;(Ii)将该任务对象传递给cancel
方法;(Iii)将已取消的任务留在堆中,但将其标记为已取消的任务;(Iv)当已取消的任务从堆中弹出时,将其丢弃。Task
的属性是start
、name
、fn
,但是schedule
的参数是name
、fn
、start
。这种不一致可能导致混淆或错误。(您需要start
作为第一位,这样任务就可以根据它们的开始时间进行比较,但是还有其他方法可以实现这一点。)TypeError: '<' not supported between instances of 'function' and 'function'
。_minheap
描述了数据结构,但更重要的是描述目的(数据结构是实现细节)。所以我会用一个像_tasks
这样的名字。_timeout
属性仅在run
函数中使用,因此它可以是一个局部变量,而不是Scheduler
对象上的一个属性。_get_next_timeout
方法只在run
函数中使用,因此它可以是本地函数,而不是Scheduler
类上的方法。(只能在持有锁的情况下调用它,因此它不适合独立的方法。)_start
方法仅在__init__
方法中调用,因此可以在其中内联。run
中的逻辑是错误的。下面的事件序列是可能的:(I)定时器线程调用_get_next_timeout
,它返回10秒的超时。(Ii)定时器线程释放条件变量。(Iii)另一个线程使用应该在5秒内运行的任务调用schedule
,并通知条件变量(但条件变量没有等待)。(4)定时器线程获取条件变量。(V)定时器线程等待10秒的旧超时。但是它应该计算出5秒的新超时时间,然后等待它。要解决这个问题,需要计算从获取条件变量到等待它之间的超时。acquire
和release
似乎有风险,因为在错误的时间出现异常将使锁保持不变。通常,人们使用上下文管理器接口来避免这种风险。我想您之所以没有这样做,是因为您想在不持有锁的情况下启动任务。但是,只需要对代码进行少量的重新排列就可以完成这项工作。这是一个带有匿名任务的版本,但是如果您希望您的任务具有名称,那么添加它们将是非常简单的。
class Scheduler:
"""A schedule of tasks to be run in background threads. Call the
schedule method to schedule a task to run at a particular time.
Call the task's cancel method to cancel it if it has not already
started running.
"""
@functools.total_ordering
class _Task:
"A scheduled task."
def __init__(self, fn, start):
"Create task that will run fn at or after the datetime start."
self.fn = fn
self.start = start
self.cancelled = False
def __le__(self, other):
# Tasks compare according to their start time.
return self.start <= other.start
@property
def timeout(self):
"Return time remaining in seconds before task should start."
return (self.start - datetime.now()).total_seconds()
def cancel(self):
"Cancel task if it has not already started running."
self.cancelled = True
logger.info("canceled %s", self)
def __init__(self):
cv = self._cv = threading.Condition(threading.Lock())
tasks = self._tasks = []
def run():
while True:
with cv:
while True:
timeout = None
while tasks and tasks[0].cancelled:
heapq.heappop(tasks)
if tasks:
timeout = tasks[0].timeout
if timeout <= 0:
task = heapq.heappop(tasks)
break
cv.wait(timeout=timeout)
logger.info("starting task %s", task)
threading.Thread(target=task.fn).start()
threading.Thread(target=run, name='Scheduler').start()
def schedule(self, fn, start):
"""Schedule a task that will run fn at or after start (which must be a
datetime object) and return an object representing that task.
"""
task = self._Task(fn, start)
logger.info("scheduling task %s", task)
with self._cv:
heapq.heappush(self._tasks, task)
self._cv.notify()
logger.info("scheduled task %s", task)
return task
https://codereview.stackexchange.com/questions/194922
复制