TensorFlow中的多线程

TensorFlow提供两个类帮助实现多线程,一个是tf.train.Coordinator,另一个是tf.train.QueueRunner。Coordinator主要用来实现多个线程同时停止,QueueRunner用来创建一系列线程。

Coordinator

根据官方文档,Coordinator主要有三个方法:

1、tf.train.Coordinator.should_stop: returns True if the threads should stop.

2、tf.train.Coordinator.request_stop: requests that threads should stop.

3、tf.train.Coordinator.join: waits until the specified threads have stopped.

接下来我们实验Coordinator,下面的代码主要实现每个线程独立计数,当某个线程达到指定值的时候,所有线程终止:

#encoding=utf-8
import threading
import numpy as np
import tensorflow as tf
#创建一个函数实现多线程,参数为Coordinater和线程号
def func(coord, t_id): 
count = 0 
while not coord.should_stop(): #不应该停止时计数 
print('thread ID:',t_id, 'count =', count)        
count += 1 
if(count == 5): #计到5时请求终止 
coord.request_stop() 
coord = tf.train.Coordinator()
 threads = [threading.Thread(target=func, args=(coord, i)) for i in range(4)]
#开始所有线程
for t in threads:    
t.start() 
coord.join(threads) #等待所有线程结束

运行结果如下,当0号线程打印出4时,其他线程不再计数,程序终止。

QueueRunner

QueueRunner的作用是创建一些重复进行enqueue操作的线程,它们通过coordinator同时结束。

#encoding=utf-8
import numpy as np
import tensorflow as tf batch_size = 2
#随机产生一个2*2的张量
example = tf.random_normal([2,2])
#创建一个RandomShuffleQueue,参数意义参见API
q = tf.RandomShuffleQueue(     capacity=1000,      min_after_dequeue=0,     dtypes=tf.float32,     shapes=[2,2])
#enqueue op,每次push一个张量
enq_op = q.enqueue(example)
#dequeue op, 每次取出batch_size个张量
xs = q.dequeue_many(batch_size)
#创建QueueRunner,包含4个enqueue op线程
qr = tf.train.QueueRunner(q, [enq_op]*4) coord = tf.train.Coordinator() sess = tf.Session()
#启动QueueRuner,开始线程
enq_threads = qr.create_threads(sess, coord=coord, start=True)for i in range(10):    
if coord.should_stop():        
break     print('step:', i, sess.run(xs)) 
#打印结果
coord.request_stop() coord.join(enq_threads)

总结

这两个类是实现TensorFlow pipeline的基础,能够高效地并行处理数据。个人认为在数据较大时,应该避免使用feed_dict。因为,feed_dict是利用python读取数据,python读取数据的时候,tensorflow无法计算,而且会将数据再次拷贝一份。

原文发布于微信公众号 - 人工智能LeadAI(atleadai)

原文发表时间:2018-03-05

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术记录

nafos:使用netty同端口监听tcpSocket和websocket

首先我们先添加好SocketChooseHandle(),这是我们的handle判断处理器。如果判断协议是以GET /开头的话,那么必定是websocket的连...

3063
来自专栏进击的程序猿

The Clean Architecture in PHP 读书笔记(五)The Clean Architecture in PHP 读书笔记(五)

上篇最重要的是介绍了去耦的工具之一依赖注入,本篇将继续介绍去耦工具:接口和适配器,本文是The Clean Architecture in PHP的第5篇。

612
来自专栏java 成神之路

FutureTask 源码分析

3777
来自专栏逸鹏说道

C#进阶系列——WebApi 接口参数不再困惑:传参详解 下

(1)基础类型数组 var arr = ["1", "2", "3", "4"]; $.ajax({ type: "post", ...

2896
来自专栏后台开发+音视频+ffmpeg

dpvs源码分析(续二)

在上一篇<dpvs源码分析(续)>中,我们以tcp为例,讲到了连接的建立,同时也提到了full-nat,snat这些术语。在该篇中,我们再来讲讲连接建立的过程。

2995
来自专栏我的小碗汤

自动评论csdn博客文章实现

今天我们来用java代码爬取csdn博客网站,然后自动评论,这一波操作可以说是相当风骚了,话不多说,咱上代码。

1552
来自专栏MasiMaro 的技术博文

Vista 及后续版本的新线程池

在上一篇的博文中,说了下老版本的线程池,在Vista之后,微软重新设计了一套线程池机制,并引入一组新的线程池API,新版线程池相对于老版本的来说,它的可控性更高...

1433
来自专栏武培轩的专栏

迅雷面经汇总

实现多态的技术称为 :动态绑定,是指在执行期间判断所引用对象的实际类型,根据其实际的类型调用其相应的方法。

1161
来自专栏Felix的技术分享

在AndroidStudio编译过程中遇到Error:duplicate files during packaging of APK问题的解决方法

2193
来自专栏linux驱动个人学习

input子系统事件处理层(evdev)的环形缓冲区【转】

在事件处理层(evdev.c)中结构体evdev_client定义了一个环形缓冲区(circular buffer),其原理是用数组的方式实现了一个先进先出的循...

2906

扫码关注云+社区