首先,我对Python很陌生,不熟悉它的功能。我一直在用MATLAB。
PC简介规范: Windows 10,Intel i7
我正在尝试创建一个timer类,用于像MATLAB这样的函数的周期性执行,这显然是从Java timer中借用的。MATLAB定时器有大约1毫秒的分辨率,我从未见过它在任何情况下都超过2毫秒。事实上,对于我的项目来说,它是足够准确的。
最近,我计划转向Python,因为MATLAB的并行计算和web访问功能很差。然而,不幸的是,与MATLAB相比,Python的标准包提供了一些低级别的定时器(threading.Timer),而我不得不创建自己的timer类。首先,我提到了QnA 用Python执行周期性动作[重复]。迈克尔安德森提出的解决方案给出了一个简单的漂移校正的想法。他用time.sleep()来保持这段时间。该方法具有较高的精度,有时比MATLAB计时器具有更好的精度。大约0.5毫秒分辨率但是,在time.sleep()中捕获计时器时,不能中断计时器(暂停或恢复)。但我有时不得不立即停止,不管它是否在睡觉()。
我发现的问题的一个解决方案是在线程包中使用事件类。请参阅threading.timer -每'n‘秒重复一次函数。使用Event.wait()的超时特性,我可以在执行之间设置一个时间间隔,并使用它来保持这个时间段。也就是说,通常会清除事件,以便等待(超时)可以像time.sleep(interval)那样工作,并且我可以在需要时通过设置事件立即退出wait()。
那时一切似乎都很好,但是Event.wait()中有一个关键问题。时间延迟在1~ 15 ms之间变化过大。我认为它来自于Event.wait()的开销。
我编写了一个示例代码,显示了time.sleep()和Event.wait()之间的准确性比较。这将总计1000次迭代,包括1毫秒、睡眠()和等待(),以查看累积的时间错误。预期结果约为1.000。
import time
from threading import Event
time.sleep(3) # to relax
# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
time.sleep(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
time.sleep(3) # to relax
# Event.wait()
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
event.wait(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
结果:
1.1379848184879964
15.614547161211096
结果表明,time.sleep()具有更好的精度。但是,我不能像前面提到的那样完全依赖time.sleep()。
总而言之,
我目前正在考虑一个折衷方案:就像在这个例子中一样,让一个小time.sleep()循环(0.5ms间隔),并使用if-语句退出循环,并在需要时中断。据我所知,这个方法在Python2.x Python time.sleep() vs event.wait()中使用。
这是一个冗长的导言,但我的问题可以概括如下。
非常感谢。
发布于 2020-07-22 18:53:48
我在Event.wait()
上遇到了同样的时间问题。我想出的解决方案是创建一个模仿threading.Event
的类。在内部,它使用time.sleep()
循环和繁忙循环的组合,以极大地提高精度。睡眠循环在单独的线程中运行,因此主线程中的阻塞wait()
调用仍然可以立即中断。当调用set()
方法时,休眠线程应该在稍后结束。另外,为了将CPU利用率降到最低,我确保繁忙循环的运行时间不会超过3毫秒。
下面是我的自定义Event
类以及一个计时演示(演示的打印执行时间以纳秒为单位):
import time
import _thread
import datetime
class Event:
__slots__ = (
"_flag", "_lock", "_nl",
"_pc", "_waiters"
)
_lock_type = _thread.LockType
_timedelta = datetime.timedelta
_perf_counter = time.perf_counter
_new_lock = _thread.allocate_lock
class _switch:
__slots__ = ("_on",)
def __call__(self, on: bool = None):
if on is None:
return self._on
self._on = on
def __bool__(self):
return self._on
def __init__(self):
self._on = False
def clear(self):
with self._lock:
self._flag(False)
def is_set(self) -> bool:
return self._flag()
def set(self):
with self._lock:
self._flag(True)
waiters = self._waiters
for waiter in waiters:
waiter.release()
waiters.clear()
def wait(
self,
timeout: float = None
) -> bool:
with self._lock:
return self._wait(self._pc(), timeout)
def _new_waiter(self) -> _lock_type:
waiter = self._nl()
waiter.acquire()
self._waiters.append(waiter)
return waiter
def _wait(
self,
start: float,
timeout: float,
td=_timedelta,
pc=_perf_counter,
end: _timedelta = None,
waiter: _lock_type = None,
new_thread=_thread.start_new_thread,
thread_delay=_timedelta(milliseconds=3)
) -> bool:
flag = self._flag
if flag:
return True
elif timeout is None:
waiter = self._new_waiter()
elif timeout <= 0:
return False
else:
delay = td(seconds=timeout)
end = td(seconds=start) + delay
if delay > thread_delay:
mark = end - thread_delay
waiter = self._new_waiter()
new_thread(
self._wait_thread,
(flag, mark, waiter)
)
lock = self._lock
lock.release()
try:
if waiter:
waiter.acquire()
if end:
while (
not flag and
td(seconds=pc()) < end
):
pass
finally:
lock.acquire()
if waiter and not flag:
self._waiters.remove(waiter)
return flag()
@staticmethod
def _wait_thread(
flag: _switch,
mark: _timedelta,
waiter: _lock_type,
td=_timedelta,
pc=_perf_counter,
sleep=time.sleep
):
while not flag and td(seconds=pc()) < mark:
sleep(0.001)
if waiter.locked():
waiter.release()
def __new__(cls):
_new_lock = cls._new_lock
_self = object.__new__(cls)
_self._waiters = []
_self._nl = _new_lock
_self._lock = _new_lock()
_self._flag = cls._switch()
_self._pc = cls._perf_counter
return _self
if __name__ == "__main__":
def test_wait_time():
wait_time = datetime.timedelta(microseconds=1)
wait_time = wait_time.total_seconds()
def test(
event=Event(),
delay=wait_time,
pc=time.perf_counter
):
pc1 = pc()
event.wait(delay)
pc2 = pc()
pc1, pc2 = [
int(nbr * 1000000000)
for nbr in (pc1, pc2)
]
return pc2 - pc1
lst = [
f"{i}.\t\t{test()}"
for i in range(1, 11)
]
print("\n".join(lst))
test_wait_time()
del test_wait_time
发布于 2021-01-25 17:32:55
Chris D的自定义事件类工作得非常好!出于实际目的,我已经将它包含在一个可安装的包中(https://github.com/ovinc/oclock,Installwithpip install oclock
),其中还包括其他计时工具。从oclock
的1.3.0版本开始,人们可以使用Chris的答案中讨论的自定义Event
类。
from oclock import Event
event = Event()
event.wait(1)
使用通常的set()
、clear()
、is_set()
、wait()
类的方法。
与threading.Event
相比,在Windows环境下,定时精度要好得多。例如,在具有1000个重复循环的Windows机器上,threading.Event
的循环时间为7ms,oclock.Event
的循环时间小于0.01ms。道具给克里斯·D!
注意:为了与StackOverflow的CCbySA4.0兼容,oclock
包处于GPLv3许可之下。
发布于 2021-06-11 09:54:15
谢谢你的这个话题和所有的答案。我还遇到了一些时间不准确的问题(Windows 10 +Python3.9+线程)。
解决方案是使用八点钟包,并通过威尔斯包改变(暂时)对Windows系统定时器的分辨率。这个软件包使用了无文档的Windows函数NtSetTimerResolution (警告:分辨率在系统范围内发生了变化)。
应用八点钟软件包并不能解决这一问题。
在应用了这两个python包之后,下面的代码对周期性事件进行了正确而准确的调度。如果终止,则恢复原来的定时器解析。
import threading
import datetime
import time
import oclock
import wres
class Job(threading.Thread):
def __init__(self, interval, *args, **kwargs):
threading.Thread.__init__(self)
# use oclock.Event() instead of threading.Event()
self.stopped = oclock.Event()
self.interval = interval.total_seconds()
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
prevTime = time.time()
while not self.stopped.wait(self.interval):
now = time.time()
print(now - prevTime)
prevTime = now
# Set system timer resolution to 1 ms
# Automatically restore previous resolution when exit with statement
with wres.set_resolution(10000):
# Create thread with periodic task called every 10 ms
job = Job(interval=datetime.timedelta(seconds=0.010))
job.start()
try:
while True:
time.sleep(1)
# Hit Ctrl+C to terminate main loop and spawned thread
except KeyboardInterrupt:
job.stop()
https://stackoverflow.com/questions/48984512
复制相似问题