首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >制作计时器: threading.Event.wait -Python3.6的超时不准确

制作计时器: threading.Event.wait -Python3.6的超时不准确
EN

Stack Overflow用户
提问于 2018-02-26 08:48:30
回答 3查看 1.5K关注 0票数 4

首先,我对Python很陌生,不熟悉它的功能。我一直在用MATLAB。

PC简介规范: Windows 10,Intel i7

我正在尝试创建一个timer类,用于像MATLAB这样的函数的周期性执行,这显然是从Java timer中借用的。MATLAB定时器有大约1毫秒的分辨率,我从未见过它在任何情况下都超过2毫秒。事实上,对于我的项目来说,它是足够准确的。

最近,我计划转向Python,因为MATLAB的并行计算和web访问功能很差。然而,不幸的是,与MATLAB相比,Python的标准包提供了一些低级别的定时器(threading.Timer),而我不得不创建自己的timer类。首先,我提到了QnA 用Python执行周期性动作[重复]。迈克尔安德森提出的解决方案给出了一个简单的漂移校正的想法。他用time.sleep()来保持这段时间。该方法具有较高的精度,有时比MATLAB计时器具有更好的精度。大约0.5毫秒分辨率但是,在time.sleep()中捕获计时器时,不能中断计时器(暂停或恢复)。但我有时不得不立即停止,不管它是否在睡觉()。

我发现的问题的一个解决方案是在线程包中使用事件类。请参阅threading.timer -每'n‘秒重复一次函数。使用Event.wait()的超时特性,我可以在执行之间设置一个时间间隔,并使用它来保持这个时间段。也就是说,通常会清除事件,以便等待(超时)可以像time.sleep(interval)那样工作,并且我可以在需要时通过设置事件立即退出wait()。

那时一切似乎都很好,但是Event.wait()中有一个关键问题。时间延迟在1~ 15 ms之间变化过大。我认为它来自于Event.wait()的开销。

我编写了一个示例代码,显示了time.sleep()和Event.wait()之间的准确性比较。这将总计1000次迭代,包括1毫秒、睡眠()和等待(),以查看累积的时间错误。预期结果约为1.000。

代码语言:javascript
运行
复制
import time
from threading import Event

time.sleep(3)  # to relax

# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
    time.sleep(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

time.sleep(3)  # to relax

# Event.wait()    
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
    event.wait(tspan/N)
t2 = time.perf_counter()

print(t2-t1)

结果:

代码语言:javascript
运行
复制
1.1379848184879964
15.614547161211096

结果表明,time.sleep()具有更好的精度。但是,我不能像前面提到的那样完全依赖time.sleep()。

总而言之,

  • time.sleep():准确但不可中断
  • threading.Event.wait():不准确但可中断

我目前正在考虑一个折衷方案:就像在这个例子中一样,让一个小time.sleep()循环(0.5ms间隔),并使用if-语句退出循环,并在需要时中断。据我所知,这个方法在Python2.x Python time.sleep() vs event.wait()中使用。

这是一个冗长的导言,但我的问题可以概括如下。

  1. 我可以强迫线程进程通过外部信号或事件从time.sleep()中断吗?(这似乎是最有效的。??)
  2. 使Event.wait()更精确或减少开销时间。
  3. 除了睡眠()和Event.wait()方法之外,还有什么更好的方法来提高计时精度。

非常感谢。

EN

回答 3

Stack Overflow用户

发布于 2020-07-22 18:53:48

我在Event.wait()上遇到了同样的时间问题。我想出的解决方案是创建一个模仿threading.Event的类。在内部,它使用time.sleep()循环和繁忙循环的组合,以极大地提高精度。睡眠循环在单独的线程中运行,因此主线程中的阻塞wait()调用仍然可以立即中断。当调用set()方法时,休眠线程应该在稍后结束。另外,为了将CPU利用率降到最低,我确保繁忙循环的运行时间不会超过3毫秒。

下面是我的自定义Event类以及一个计时演示(演示的打印执行时间以纳秒为单位):

代码语言:javascript
运行
复制
import time
import _thread
import datetime


class Event:
    __slots__ = (
        "_flag", "_lock", "_nl",
        "_pc", "_waiters"
    )

    _lock_type = _thread.LockType
    _timedelta = datetime.timedelta
    _perf_counter = time.perf_counter
    _new_lock = _thread.allocate_lock

    class _switch:
        __slots__ = ("_on",)

        def __call__(self, on: bool = None):
            if on is None:
                return self._on

            self._on = on

        def __bool__(self):
            return self._on

        def __init__(self):
            self._on = False

    def clear(self):
        with self._lock:
            self._flag(False)

    def is_set(self) -> bool:
        return self._flag()

    def set(self):
        with self._lock:
            self._flag(True)
            waiters = self._waiters

            for waiter in waiters:
                waiter.release()

            waiters.clear()

    def wait(
        self,
        timeout: float = None
    ) -> bool:
        with self._lock:
            return self._wait(self._pc(), timeout)

    def _new_waiter(self) -> _lock_type:
        waiter = self._nl()
        waiter.acquire()
        self._waiters.append(waiter)
        return waiter

    def _wait(
        self,
        start: float,
        timeout: float,
        td=_timedelta,
        pc=_perf_counter,
        end: _timedelta = None,
        waiter: _lock_type = None,
        new_thread=_thread.start_new_thread,
        thread_delay=_timedelta(milliseconds=3)
    ) -> bool:
        flag = self._flag

        if flag:
            return True
        elif timeout is None:
            waiter = self._new_waiter()
        elif timeout <= 0:
            return False
        else:
            delay = td(seconds=timeout)
            end = td(seconds=start) + delay

            if delay > thread_delay:
                mark = end - thread_delay
                waiter = self._new_waiter()
                new_thread(
                    self._wait_thread,
                    (flag, mark, waiter)
                )

        lock = self._lock
        lock.release()

        try:
            if waiter:
                waiter.acquire()

            if end:
                while (
                    not flag and
                    td(seconds=pc()) < end
                ):
                    pass

        finally:
            lock.acquire()

            if waiter and not flag:
                self._waiters.remove(waiter)

        return flag()

    @staticmethod
    def _wait_thread(
        flag: _switch,
        mark: _timedelta,
        waiter: _lock_type,
        td=_timedelta,
        pc=_perf_counter,
        sleep=time.sleep
    ):
        while not flag and td(seconds=pc()) < mark:
            sleep(0.001)

        if waiter.locked():
            waiter.release()

    def __new__(cls):
        _new_lock = cls._new_lock
        _self = object.__new__(cls)
        _self._waiters = []
        _self._nl = _new_lock
        _self._lock = _new_lock()
        _self._flag = cls._switch()
        _self._pc = cls._perf_counter
        return _self


if __name__ == "__main__":
    def test_wait_time():
        wait_time = datetime.timedelta(microseconds=1)
        wait_time = wait_time.total_seconds()

        def test(
            event=Event(),
            delay=wait_time,
            pc=time.perf_counter
        ):
            pc1 = pc()
            event.wait(delay)
            pc2 = pc()
            pc1, pc2 = [
                int(nbr * 1000000000)
                for nbr in (pc1, pc2)
            ]
            return pc2 - pc1

        lst = [
            f"{i}.\t\t{test()}"
            for i in range(1, 11)
        ]
        print("\n".join(lst))

    test_wait_time()
    del test_wait_time
票数 0
EN

Stack Overflow用户

发布于 2021-01-25 17:32:55

Chris D的自定义事件类工作得非常好!出于实际目的,我已经将它包含在一个可安装的包中(https://github.com/ovinc/oclock,Installwithpip install oclock),其中还包括其他计时工具。从oclock的1.3.0版本开始,人们可以使用Chris的答案中讨论的自定义Event类。

代码语言:javascript
运行
复制
from oclock import Event
event = Event()
event.wait(1)

使用通常的set()clear()is_set()wait()类的方法。

threading.Event相比,在Windows环境下,定时精度要好得多。例如,在具有1000个重复循环的Windows机器上,threading.Event的循环时间为7ms,oclock.Event的循环时间小于0.01ms。道具给克里斯·D!

注意:为了与StackOverflow的CCbySA4.0兼容,oclock包处于GPLv3许可之下。

票数 0
EN

Stack Overflow用户

发布于 2021-06-11 09:54:15

谢谢你的这个话题和所有的答案。我还遇到了一些时间不准确的问题(Windows 10 +Python3.9+线程)。

解决方案是使用八点钟包,并通过威尔斯包改变(暂时)对Windows系统定时器的分辨率。这个软件包使用了无文档的Windows函数NtSetTimerResolution (警告:分辨率在系统范围内发生了变化)。

应用八点钟软件包并不能解决这一问题。

在应用了这两个python包之后,下面的代码对周期性事件进行了正确而准确的调度。如果终止,则恢复原来的定时器解析。

代码语言:javascript
运行
复制
import threading
import datetime
import time
import oclock
import wres

class Job(threading.Thread):
    def __init__(self, interval, *args, **kwargs):
        threading.Thread.__init__(self)
        # use oclock.Event() instead of threading.Event()
        self.stopped = oclock.Event()
        self.interval = interval.total_seconds()
        self.args = args
        self.kwargs = kwargs

    def stop(self):
        self.stopped.set()
        self.join()

    def run(self):
        prevTime = time.time()
        while not self.stopped.wait(self.interval):
            now = time.time()
            print(now - prevTime)
            prevTime = now

# Set system timer resolution to 1 ms
# Automatically restore previous resolution when exit with statement
with wres.set_resolution(10000):
    # Create thread with periodic task called every 10 ms
    job = Job(interval=datetime.timedelta(seconds=0.010))
    job.start()

    try:
        while True:
            time.sleep(1)
    # Hit Ctrl+C to terminate main loop and spawned thread
    except KeyboardInterrupt:
        job.stop()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48984512

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档