TensorFlow中的多线程

TensorFlow提供两个类帮助实现多线程,一个是tf.train.Coordinator,另一个是tf.train.QueueRunner。Coordinator主要用来实现多个线程同时停止,QueueRunner用来创建一系列线程。

Coordinator

根据官方文档,Coordinator主要有三个方法:

1、tf.train.Coordinator.should_stop: returns True if the threads should stop.

2、tf.train.Coordinator.request_stop: requests that threads should stop.

3、tf.train.Coordinator.join: waits until the specified threads have stopped.

接下来我们实验Coordinator,下面的代码主要实现每个线程独立计数,当某个线程达到指定值的时候,所有线程终止:

#encoding=utf-8
import threading
import numpy as np
import tensorflow as tf
#创建一个函数实现多线程,参数为Coordinater和线程号
def func(coord, t_id): 
count = 0 
while not coord.should_stop(): #不应该停止时计数 
print('thread ID:',t_id, 'count =', count)        
count += 1 
if(count == 5): #计到5时请求终止 
coord.request_stop() 
coord = tf.train.Coordinator()
 threads = [threading.Thread(target=func, args=(coord, i)) for i in range(4)]
#开始所有线程
for t in threads:    
t.start() 
coord.join(threads) #等待所有线程结束

运行结果如下,当0号线程打印出4时,其他线程不再计数,程序终止。

QueueRunner

QueueRunner的作用是创建一些重复进行enqueue操作的线程,它们通过coordinator同时结束。

#encoding=utf-8
import numpy as np
import tensorflow as tf batch_size = 2
#随机产生一个2*2的张量
example = tf.random_normal([2,2])
#创建一个RandomShuffleQueue,参数意义参见API
q = tf.RandomShuffleQueue(     capacity=1000,      min_after_dequeue=0,     dtypes=tf.float32,     shapes=[2,2])
#enqueue op,每次push一个张量
enq_op = q.enqueue(example)
#dequeue op, 每次取出batch_size个张量
xs = q.dequeue_many(batch_size)
#创建QueueRunner,包含4个enqueue op线程
qr = tf.train.QueueRunner(q, [enq_op]*4) coord = tf.train.Coordinator() sess = tf.Session()
#启动QueueRuner,开始线程
enq_threads = qr.create_threads(sess, coord=coord, start=True)for i in range(10):    
if coord.should_stop():        
break     print('step:', i, sess.run(xs)) 
#打印结果
coord.request_stop() coord.join(enq_threads)

总结

这两个类是实现TensorFlow pipeline的基础,能够高效地并行处理数据。个人认为在数据较大时,应该避免使用feed_dict。因为,feed_dict是利用python读取数据,python读取数据的时候,tensorflow无法计算,而且会将数据再次拷贝一份。

原文发布于微信公众号 - 人工智能LeadAI(atleadai)

原文发表时间:2018-03-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏函数式编程语言及工具

Akka(42): Http:身份验证 - authentication, authorization and use of raw headers

   当我们把Akka-http作为数据库数据交换工具时,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附...

2095
来自专栏Albert陈凯

3.3RDD的转换和DAG的生成

3.3 RDD的转换和DAG的生成 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG...

2947
来自专栏牛肉圆粉不加葱

Spark Task 的执行流程③ - 执行 task

创建、分发 Task一文中我们提到 TaskRunner(继承于 Runnable) 对象最终会被提交到 Executor 的线程池中去执行,本文就将对该执行过...

551
来自专栏码匠的流水账

聊聊kafka consumer offset lag的监控

在0.8.2.2版本如下 kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.s...

1171
来自专栏别先生

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序

一:序列化概念 序列化(Serialization)是指把结构化对象转化为字节流。 反序列化(Deserialization)是序列化的逆过程。即把字节流转回...

19710
来自专栏编程

MapReduce编程模型

通过WordCount程序理解MapReduce编程模型 WordCount,名为单词统计,功能是统计文本文件中每个单词出现的次数。例如下图中,有两个文本(蓝色...

1948
来自专栏别先生

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

1:首先搞好实体类对象:   write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Ja...

4699
来自专栏人工智能头条

在Apache Spark上跑Logistic Regression算法

1373
来自专栏祝威廉

Spark Streaming 数据清理机制

为啥要了解机制呢?这就好比JVM的垃圾回收,虽然JVM的垃圾回收已经巨牛了,但是依然会遇到很多和它相关的case导致系统运行不正常。

1173
来自专栏个人分享

Spark RDD类源码阅读

671

扫码关注云+社区