线程的协调器。该类实现一个简单的机制来协调一组线程的终止。
使用:
# Create a coordinator.
coord = Coordinator()
# Start a number of threads, passing the coordinator to each of them.
...start thread 1...(coord, ...)
...start thread N...(coord, ...)
# Wait for all the threads to terminate.
coord.join(threads)
任何线程都可以调用coord.request_stop()来请求所有线程停止。为了配合请求,每个线程必须定期检查coord .should_stop()。一旦调用了coord.request_stop(), coord.should_stop()将返回True。 一个典型的线程运行协调器会做如下事情:
while not coord.should_stop():
...do some work...
异常处理:
线程可以将异常作为request_stop()调用的一部分报告给协调器。异常将从coord.join()调用中重新引发。线程代码如下:
try:
while not coord.should_stop():
...do some work...
except Exception as e:
coord.request_stop(e)
主代码:
try:
...
coord = Coordinator()
# Start a number of threads, passing the coordinator to each of them.
...start thread 1...(coord, ...)
...start thread N...(coord, ...)
# Wait for all the threads to terminate.
coord.join(threads)
except Exception as e:
...exception that was passed to coord.request_stop()
为了简化线程实现,协调器提供了一个上下文处理程序stop_on_exception(),如果引发异常,该上下文处理程序将自动请求停止。使用上下文处理程序,上面的线程代码可以写成:
with coord.stop_on_exception():
while not coord.should_stop():
...do some work...
停止的宽限期:
当一个线程调用了coord.request_stop()后,其他线程有一个固定的停止时间,这被称为“停止宽限期”,默认为2分钟。如果任何线程在宽限期过期后仍然存活,则join()将引发一个RuntimeError报告落后者。
try:
...
coord = Coordinator()
# Start a number of threads, passing the coordinator to each of them.
...start thread 1...(coord, ...)
...start thread N...(coord, ...)
# Wait for all the threads to terminate, give them 10s grace period
coord.join(threads, stop_grace_period_secs=10)
except RuntimeError:
...one of the threads took more than 10s to stop after request_stop()
...was called.
except Exception:
...exception that was passed to coord.request_stop()
性能:
加入:
Method:
__init__
__init__(clean_stop_exception_types=None)
创建一个新的协调器。
参数:clean_stop_exception_types,异常类型的可选元组,它应该导致协调器的完全停止。如果将其中一种类型的异常报告给request_stop(ex),协调器的行为将与调用request_stop(None)一样。默认值为(tf.errors.OutOfRangeError,),输入队列使用它来表示输入的结束。当从Python迭代器提供训练数据时,通常将StopIteration添加到这个列表中。
clear_stop
clear_stop()
清除停止标志。调用此函数后,对should_stop()的调用将返回False。
join
join(
threads=None,
stop_grace_period_secs=120,
ignore_live_threads=False
)
等待线程终止。
此调用阻塞,直到一组线程终止。线程集是threads参数中传递的线程与通过调用coordinator .register_thread()向协调器注册的线程列表的联合。线程停止后,如果将exc_info传递给request_stop,则会重新引发该异常。宽限期处理:当调用request_stop()时,将给线程“stop_grace__secs”秒来终止。如果其中任何一个在该期间结束后仍然存活,则会引发RuntimeError。注意,如果将exc_info传递给request_stop(),那么它将被引发,而不是RuntimeError。
参数:
threads
: 线程列表。除了已注册的线程外,还要连接已启动的线程。异常:
RuntimeError
: If any thread is still alive after request_stop()
is called and the grace period expires.raise_requested_exception
raise_requested_exception()
如果将异常传递给request_stop,则会引发异常。
register_thread
register_thread(thread)
注册要加入的线程。
参数:
request_stop
request_stop(ex=None)
请求线程停止。调用此函数后,对should_stop()的调用将返回True。
注意:如果传入异常,in必须在处理异常的上下文中(例如try:…expect expection as ex:......,例如:)和不是一个新创建的。
参数:
should_stop
should_stop()
检查是否要求停止。
返回:
stop_on_exception
stop_on_exception(
*args,
**kwds
)
上下文管理器,用于在引发异常时请求停止。使用协调器的代码必须捕获异常并将其传递给request_stop()方法,以停止协调器管理的其他线程。这个上下文处理程序简化了异常处理。使用方法如下:
with coord.stop_on_exception():
# Any exception raised in the body of the with
# clause is reported to the coordinator before terminating
# the execution of the body.
...body...
这完全等价于稍微长一点的代码:
try:
...body...
except:
coord.request_stop(sys.exc_info())
产生:
nothing.
wait_for_stop
wait_for_stop(timeout=None)
等待协调器被告知停止。
参数:
返回值: