TensorFlow中的多线程

TensorFlow提供两个类帮助实现多线程,一个是tf.train.Coordinator,另一个是tf.train.QueueRunner。Coordinator主要用来实现多个线程同时停止,QueueRunner用来创建一系列线程。

Coordinator

根据官方文档,Coordinator主要有三个方法:

1、tf.train.Coordinator.should_stop: returns True if the threads should stop.

2、tf.train.Coordinator.request_stop: requests that threads should stop.

3、tf.train.Coordinator.join: waits until the specified threads have stopped.

接下来我们实验Coordinator,下面的代码主要实现每个线程独立计数,当某个线程达到指定值的时候,所有线程终止:

#encoding=utf-8
import threading
import numpy as np
import tensorflow as tf
#创建一个函数实现多线程,参数为Coordinater和线程号
def func(coord, t_id): 
count = 0 
while not coord.should_stop(): #不应该停止时计数 
print('thread ID:',t_id, 'count =', count)        
count += 1 
if(count == 5): #计到5时请求终止 
coord.request_stop() 
coord = tf.train.Coordinator()
 threads = [threading.Thread(target=func, args=(coord, i)) for i in range(4)]
#开始所有线程
for t in threads:    
t.start() 
coord.join(threads) #等待所有线程结束

运行结果如下,当0号线程打印出4时,其他线程不再计数,程序终止。

QueueRunner

QueueRunner的作用是创建一些重复进行enqueue操作的线程,它们通过coordinator同时结束。

#encoding=utf-8
import numpy as np
import tensorflow as tf batch_size = 2
#随机产生一个2*2的张量
example = tf.random_normal([2,2])
#创建一个RandomShuffleQueue,参数意义参见API
q = tf.RandomShuffleQueue(     capacity=1000,      min_after_dequeue=0,     dtypes=tf.float32,     shapes=[2,2])
#enqueue op,每次push一个张量
enq_op = q.enqueue(example)
#dequeue op, 每次取出batch_size个张量
xs = q.dequeue_many(batch_size)
#创建QueueRunner,包含4个enqueue op线程
qr = tf.train.QueueRunner(q, [enq_op]*4) coord = tf.train.Coordinator() sess = tf.Session()
#启动QueueRuner,开始线程
enq_threads = qr.create_threads(sess, coord=coord, start=True)for i in range(10):    
if coord.should_stop():        
break     print('step:', i, sess.run(xs)) 
#打印结果
coord.request_stop() coord.join(enq_threads)

总结

这两个类是实现TensorFlow pipeline的基础,能够高效地并行处理数据。个人认为在数据较大时,应该避免使用feed_dict。因为,feed_dict是利用python读取数据,python读取数据的时候,tensorflow无法计算,而且会将数据再次拷贝一份。

原文发布于微信公众号 - 人工智能LeadAI(atleadai)

原文发表时间:2018-03-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏calmound

知识点提纲

操作系统: 1. 进程的有哪几种状态,状态转换图,及导致转换的事件。 2. 进程与线程的区别。 3. 进程通信的几种方式。 4. 线程同步几种方式。(一定要会写...

3378
来自专栏开发 & 算法杂谈

基于Lockset的数据竞争检测方法汇总(一)

对于搞数据竞争检测方向的人来说,Lockset方法大家肯定不陌生,作为一个刚入门数据竞争检测方向的我来说,就和大家总结一下我近期有关Lockset方法的一些研究...

1484
来自专栏ACM小冰成长之路

HDU-1512-Monkey King

ACM模版 描述 ? 题解 典型的左偏树! 问题的核心是当两个猴子帮派中的大佬斗争之后,这两个大佬要强壮值减半并且两个帮派进行合并。那么涉及到的操作有优先队列的...

1756
来自专栏灯塔大数据

每周学点大数据 | No.28 表排序

No.28期 表排序 Mr. 王:前面我们讨论了一些基础磁盘算法,现在我们来讨论一些关于磁盘中图算法的问题。 通过对基础磁盘算法的学习,我们可以很容易地想到...

3077
来自专栏IT技术精选文摘

分布式系统中生成全局ID的总结与思考

世间万物,都有自己唯一的标识,比如人,每个人都有自己的指纹(白夜追凶给我科普的,同卵双胞胎DNA一样,但指纹不一样)。又如中国人,每个中国人有自己的身份证。对于...

1808
来自专栏醒者呆

Go并发模式:管道与取消

关键字:Go语言,管道,取消机制,并发,sync.WaitGroup,包引用,通道,defer,select GO并发模式:管道与取消 简介 Go的并发能...

2796
来自专栏灯塔大数据

每周学点大数据 | No.32优先队列

No.32期 优先队列 Mr. 王:我们回到这个问题中,如果是在内存中,我们只需要对前面的这些点做一个拓扑排序,就可以保证每一个节点在求解时,它们的所有入度...

26810
来自专栏同步博客

memcached分布式缓存

  memcached虽然称为“分布式”缓存服务器,但服务器端并没有“分布式”功能。Memcache集群主机不能够相互通信传输数据,它的“分布式”是基于客户端的...

641
来自专栏吉浦迅科技

DAY31:阅读global memory

692
来自专栏xingoo, 一个梦想做发明家的程序员

Elasticsearch聚合初探——metric篇

Elasticsearch是一款提供检索以及相关度排序的开源框架,同时,也支持对存储的文档进行复杂的统计——聚合。 前言 ES中的聚合被分为两大类:Met...

17210

扫描关注云+社区