首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >Python任务调度程序

Python任务调度程序
EN

Code Review用户
提问于 2018-05-21 21:44:24
回答 1查看 4K关注 0票数 5

将任务提交到调度程序,允许在以后的某个时间运行它们。任务可以按任何顺序提交,不运行的任务也可以取消。

我使用一个min堆来确定下一个任务的优先级,使用一个threading.Condition在以下任务之间进行通信。

  • 一个“观察者”线程,在需要运行下一个任务之前睡觉,或者它的睡眠时间需要缩短(添加了新的“更早”任务)。
  • 主线程,它将写入min堆。

调度是O(log n),取消是O(n),最快的任务是O(1)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
import heapq
import logging
import functools
import time
import threading
from datetime import datetime, timedelta
from collections import namedtuple


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

Task = namedtuple('Task', ['start', 'name', 'fn'])


class Scheduler(object):
    '''Class that schedules functions to be run in a separate thread at some future
    time. Supports cancellation of functions that haven't yet started.
    '''
    def __init__(self):
        self._cv = threading.Condition(threading.Lock())
        self._minheap = []
        self._timeout = None
        self._start()

    def cancel(self, name):
        with self._cv:
            try:
                task = [task for task in self._minheap if task.name == name][0]
            except IndexError:
                return
            self._minheap.remove(task)
            heapq.heapify(self._minheap)
            self._cv.notify()
        logger.info('canceled {}'.format(task.name))

    def schedule(self, name, fn, start):
        task = Task(start, name, fn)
        logger.info('scheduling task: {}'.format(name))
        with self._cv:
            heapq.heappush(self._minheap, task)
            self._cv.notify()
        logger.info('scheduled task: {}'.format(name))

    def _get_next_timeout(self):
        if not self._minheap:
            return None
        return (self._minheap[0].start - datetime.now()).total_seconds()

    def _start(self):
        def run():
            while True:
                self._cv.acquire()
                logger.info('waiting with timeout: {}'.format(self._timeout))
                not_expired = self._cv.wait(timeout=self._timeout)
                if self._timeout is None:
                    logger.info('no timeout found; using min element')
                    self._timeout = self._get_next_timeout()
                    self._cv.release()
                elif not_expired:
                    logger.info('already waiting but woken up; comparing current with min element')
                    self._timeout = min(self._timeout, self._get_next_timeout())
                    self._cv.release()
                else:
                    logger.info('timed out; running next task')
                    next_task = heapq.heappop(self._minheap)
                    self._timeout = self._get_next_timeout()
                    self._cv.release()
                    threading.Thread(target=next_task.fn, name=next_task.name).start()

        threading.Thread(target=run, name='timer').start()


def main():
    logging.basicConfig(level=logging.INFO, format='%(threadName)-10s: %(message)s')

    start = datetime.now()

    def task():
        logger.info('running, elapsed: {}'.format((datetime.now() - start).total_seconds()))

    s = Scheduler()
    s.schedule('task-1', functools.partial(task), start + timedelta(seconds=1))
    s.schedule('task-2', functools.partial(task), start + timedelta(seconds=2))
    s.cancel('task-2')
    s.schedule('task-3', functools.partial(task), start + timedelta(seconds=3))
    # note that task-4 precedes task-3, but is registered after task-3
    s.schedule('task-4', functools.partial(task), start + timedelta(seconds=2.5))
    time.sleep(5)
    now = datetime.now()
    s.schedule('task-5', functools.partial(task), now + timedelta(seconds=5))
    s.schedule('task-6', functools.partial(task), now + timedelta(seconds=4))
    s.schedule('task-7', functools.partial(task), now + timedelta(seconds=3.5))


if __name__ == '__main__':
    main()

输出:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
~/c/dsa [10265e2] (master⚡)
(n) p3 py/epi/19_7.py
timer     : waiting with timeout: None
MainThread: scheduling task: task-1
MainThread: scheduled task: task-1
MainThread: scheduling task: task-2
MainThread: scheduled task: task-2
MainThread: canceled task-2
MainThread: scheduling task: task-3
MainThread: scheduled task: task-3
MainThread: scheduling task: task-4
MainThread: scheduled task: task-4
timer     : no timeout found; using min element
timer     : waiting with timeout: 0.999214
timer     : timed out; running next task
task-1    : running, elapsed: 1.006024
timer     : waiting with timeout: 1.494409
timer     : timed out; running next task
task-4    : running, elapsed: 2.506384
timer     : waiting with timeout: 0.49432
timer     : timed out; running next task
task-3    : running, elapsed: 3.005836
timer     : waiting with timeout: None
MainThread: scheduling task: task-5
MainThread: scheduled task: task-5
timer     : no timeout found; using min element
MainThread: scheduling task: task-6
timer     : waiting with timeout: 4.999305
MainThread: scheduled task: task-6
timer     : already waiting but woken up; comparing current with min element
MainThread: scheduling task: task-7
timer     : waiting with timeout: 3.998729
MainThread: scheduled task: task-7
timer     : already waiting but woken up; comparing current with min element
timer     : waiting with timeout: 3.498098
timer     : timed out; running next task
task-7    : running, elapsed: 8.509112
timer     : waiting with timeout: 0.493943
timer     : timed out; running next task
task-6    : running, elapsed: 9.008533
timer     : waiting with timeout: 0.994441
timer     : timed out; running next task
task-5    : running, elapsed: 10.005569
timer     : waiting with timeout: None

并发性很难,所以我很想听听人们的想法。谢谢!

EN

回答 1

Code Review用户

回答已采纳

发布于 2018-05-22 02:31:16

1.回顾

  1. 可以改进文档字符串。如果Scheduler类的docstring (简单地)解释了如何使用它,并且应该有用于schedulecancel方法的docstring,那就太好了。
  2. 当使用logging模块时,通常不需要自己格式化日志消息。相反,传递格式字符串和参数,并让记录器在需要时执行格式化(取决于日志级别,它可能永远不需要格式化特定的消息)。所以不是:logger.info(‘等待超时:{}'.format( self._timeout) )写:logger.info(“等待超时%f",self._timeout)
  3. 如果有多个具有相同名称的任务,则cancel方法将取消最接近堆开始的任务。这不一定是最快的启动时间(因为堆被安排为一棵树,而不是排序列表)。在我看来,这种行为很难理解,而且似乎会导致程序不可靠。我认为,如果您采用以下三种方法之一,用户编写可靠的程序就更容易了:(I)取消所有具有匹配名称的任务,使其行为不依赖于堆的排列;或者(Ii)要求队列中的所有任务都具有唯一的名称;或者(Iii)让schedule方法返回表示任务的某个对象,以便以后可以传递给cancel来唯一标识要取消的特定任务。
  4. 取消所需的时间与堆中的任务数成正比。这可以改进为(摊销) \$O(\log n)\$ (I)让schedule方法返回一些表示上述任务的对象;(Ii)将该任务对象传递给cancel方法;(Iii)将已取消的任务留在堆中,但将其标记为已取消的任务;(Iv)当已取消的任务从堆中弹出时,将其丢弃。
  5. Task的属性是startnamefn,但是schedule的参数是namefnstart。这种不一致可能导致混淆或错误。(您需要start作为第一位,这样任务就可以根据它们的开始时间进行比较,但是还有其他方法可以实现这一点。)
  6. 如果您尝试用相同的启动时间和相同的名称调度两个任务,那么您将得到TypeError: '<' not supported between instances of 'function' and 'function'
  7. 名称_minheap描述了数据结构,但更重要的是描述目的(数据结构是实现细节)。所以我会用一个像_tasks这样的名字。
  8. _timeout属性仅在run函数中使用,因此它可以是一个局部变量,而不是Scheduler对象上的一个属性。
  9. _get_next_timeout方法只在run函数中使用,因此它可以是本地函数,而不是Scheduler类上的方法。(只能在持有锁的情况下调用它,因此它不适合独立的方法。)
  10. _start方法仅在__init__方法中调用,因此可以在其中内联。
  11. 在我看来,run中的逻辑是错误的。下面的事件序列是可能的:(I)定时器线程调用_get_next_timeout,它返回10秒的超时。(Ii)定时器线程释放条件变量。(Iii)另一个线程使用应该在5秒内运行的任务调用schedule,并通知条件变量(但条件变量没有等待)。(4)定时器线程获取条件变量。(V)定时器线程等待10秒的旧超时。但是它应该计算出5秒的新超时时间,然后等待它。要解决这个问题,需要计算从获取条件变量到等待它之间的超时。
  12. 在条件变量上使用acquirerelease似乎有风险,因为在错误的时间出现异常将使锁保持不变。通常,人们使用上下文管理器接口来避免这种风险。我想您之所以没有这样做,是因为您想在不持有锁的情况下启动任务。但是,只需要对代码进行少量的重新排列就可以完成这项工作。

2.修改后的代码

这是一个带有匿名任务的版本,但是如果您希望您的任务具有名称,那么添加它们将是非常简单的。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class Scheduler:
    """A schedule of tasks to be run in background threads. Call the
    schedule method to schedule a task to run at a particular time.
    Call the task's cancel method to cancel it if it has not already
    started running.

    """

    @functools.total_ordering
    class _Task:
        "A scheduled task."

        def __init__(self, fn, start):
            "Create task that will run fn at or after the datetime start."
            self.fn = fn
            self.start = start
            self.cancelled = False

        def __le__(self, other):
            # Tasks compare according to their start time.
            return self.start <= other.start

        @property
        def timeout(self):
            "Return time remaining in seconds before task should start."
            return (self.start - datetime.now()).total_seconds()

        def cancel(self):
            "Cancel task if it has not already started running."
            self.cancelled = True
            logger.info("canceled %s", self)

    def __init__(self):
        cv = self._cv = threading.Condition(threading.Lock())
        tasks = self._tasks = []

        def run():
            while True:
                with cv:
                    while True:
                        timeout = None
                        while tasks and tasks[0].cancelled:
                            heapq.heappop(tasks)
                        if tasks:
                            timeout = tasks[0].timeout
                            if timeout <= 0:
                                task = heapq.heappop(tasks)
                                break
                        cv.wait(timeout=timeout)
                logger.info("starting task %s", task)
                threading.Thread(target=task.fn).start()

        threading.Thread(target=run, name='Scheduler').start()

    def schedule(self, fn, start):
        """Schedule a task that will run fn at or after start (which must be a
        datetime object) and return an object representing that task.

        """
        task = self._Task(fn, start)
        logger.info("scheduling task %s", task)
        with self._cv:
            heapq.heappush(self._tasks, task)
            self._cv.notify()
        logger.info("scheduled task %s", task)
        return task
票数 4
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/194922

复制
相关文章
Node.js 连接 MongoDB--插入数据
本章节我们将为大家介绍如何使用 Node.js 来连接 MongoDB,并对数据库进行操作。
陈不成i
2021/07/13
2K0
使用insert () 在MongoDB中插入数组
“insert”命令也可以一次将多个文档插入到集合中。下面我们操作如何一次插入多个文档。
MongoDB中文社区
2020/02/19
7.7K0
使用insert () 在MongoDB中插入数组
mongoDB 文档插入
db.collection.insertOne(obj, ) 插入单个文档到一个集合(3.2版本有效),可选参数为w, wtimeout db.collection.insertMany( [objects], ) 插入多个文档到一个集合(3.2版本有效),可选参数为w, wtimeout db.collection.insert(obj) 传统的插入方式
Leshami
2018/08/13
9820
MongoDB 插入文档
BSON 是一种类似 JSON 的二进制形式的存储格式,是 Binary JSON 的简称。
用户4988376
2021/08/13
1.2K0
房上的猫:数组插入算法等难点专开
一:查找算法 public class Aini { public static void main(String[] args) { // 数组查找算法 // 查找学生是否存在 // 导入扫描仪 Scanner bdqn = new Scanner(System.in); System.out.println("请输入你要查找的学生:"); String name = bdqn.next();
房上的猫
2018/03/14
7140
Node.js中的MongoDB
show dbs: 显示当前所有的数据库 use 数据库名 ":进入到指定数据库中 db :显示当前所在的数据库 show collections:显示数据库中的所有集合
不愿意做鱼的小鲸鱼
2022/08/24
5.3K0
Node.js中的MongoDB
MongoDB(7)- 文档插入操作
插入方法 db.collection.insertOne() 插入单条文档到集合中 db.collection.insertMany() 插入多条文档到集合中 db.collection.insert() 插入单条或多条文档到集合中 insertOne() 语法格式 db.collection.insertOne( <document>, { writeConcern: <document> } ) 只能传一个文档,不能是数组 insertMany() 语法格式 db.col
小菠萝测试笔记
2021/06/09
9980
nodejs连接MongoDB插入数据
昨天介绍了一下MongoDB在shell下的正删改查,今天来讲一下在nodejs中如何连接数据库以及数据的插入!
十月梦想
2018/08/29
1.7K0
node.js + mongodb 原
想写博客很长时间了,因为一直身患懒癌,所以一直拖到了现在。markdown的语法也是刚刚学,试验一下效果 好了不说了,直接上干货了。 ------------------------------------------------------------------------------------------------
笔阁
2018/09/04
2K0
node.js + mongodb 
                                                                            原
MongoDB 存储过程
MongoDB支持存储过程的使用,它的存储过程是用javascript实现的,被存在于system.js表中,可以接收和输出参数,返回执行存储过程的状态值,也可以嵌套调用。
郭顺发
2022/05/26
1.2K0
MongoDB 存储过程
MongoDB 数组在mongodb 中存在的意义
在MOGNODB 的文档设计和存储中,存在两个部分 1 嵌套 2 数组,所以如果想设计好一个MONGODB 在理解业务,读写比例,查询方式后,就需要介入到更深层次的理解嵌套的查询方式,嵌套多层后的性能问题, 数组其实比嵌套带来更多的问题,所以今天我们的从数组开始。
AustinDatabases
2022/04/05
4.3K0
MongoDB  数组在mongodb 中存在的意义
数组插入排序
插入排序是一个相对复杂一点的排序算法,但是效率要比我们以前接触过的排序算法快一些,他的思想是将数组分为两组数据(第一次分的时候就是数组第一个元素为一组,后面的所有元素为一组),然后从后面一组数据中抽取第一个元素与前面一组数据依次做对比,按需求将大的或者小的值插入到前面的一组数据中,最终后面一组数据全部插入完毕后,前面一组数据就是有序状态了。
我与梦想有个约会
2023/10/20
1250
数组插入排序
js 数组插入删除[通俗易懂]
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
全栈程序员站长
2022/11/14
6.8K0
MongoDB 数组查询
MongoDB shell version: 2.0.0 connecting to: test
阳光岛主
2019/02/19
2.4K0
MongoDB 数组查询
三、小结 a、数组查询有精确和模糊之分,精确匹配需要指定数据元素的全部值 b、数组查询可以通过下标的方式进行查询 c、数组内嵌套文档可以通过.成员的方式进行查询 d、数组至少一个元素满足所有指定的匹配条件可以使用$elemMatch e、数组查询中返回元素的子集可以通过$slice以及占位符来实现f、占位符来实现 f、all满足所有指定的匹配条件,不考虑多出的元素以及元素顺序问题
Leshami
2018/08/13
6.8K0
MongoDB数据的插入、查询、更新和删除
在MongoDB中,我们可以使用CRUD(Create、Read、Update、Delete)操作来插入、查询、更新和删除数据。这些操作都是通过MongoDB shell或编程语言驱动程序(如Python、Java、Node.js等)来执行的。
玖叁叁
2023/04/13
2.5K0
mongodb 相关的查找,插入删除等操作
http://blog.csdn.net/mcpang/article/details/7833805
bear_fish
2018/09/20
1.4K0
图解B+树的插入过程
B+ 树在现代数据库中很常见,如果我们了解它,在工作中可能对性能优化会有更好的帮助!
业余草
2019/07/11
7.3K0
图解B+树的插入过程
MongoDB(11)- 查询数组
如果希望找到的是包含 red、blank 两个元素的数组,可以使用 $all 操作符
小菠萝测试笔记
2021/06/09
2.4K0
Python操作MongoDB数组
1. Python操作字符串数组插入元素。 # pip install pymongo==3.12.3 # 数组示例 strArr = ['a','b','c'] from pymongo import MongoClient mongoClient = MongoClient(DB_IP, username='mongo', password='password',
编程随想曲
2022/12/01
4430

相似问题

scanr是如何工作的?Haskell

23

haskell代码是如何工作的?

12

Haskell异常处理是如何工作的?

32

Haskell尾部递归是如何工作的?

52

F#是如何实现让rec的?

13
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文