TensorFlow 队列与多线程的应用

深度学习的模型训练过程往往需要大量的数据,而将这些数据一次性的读入和预处理需要大量的时间开销,所以通常采用队列与多线程的思想解决这个问题,而且TensorFlow为我们提供了完善的函数。

实现队列

在Python中是没有提供直接实现队列的函数的,所以通常会使用列表模拟队列。 而TensorFlow提供了整套实现队列的函数和方法,在TensorFlow中,队列和变量类似,都是计算图上有状态的节点。操作队列的函数主要有:

FIFOQueue():创建一个先入先出(FIFO)的队列 RandomShuffleQueue():创建一个随机出队的队列 enqueue_many():初始化队列中的元素 dequeue():出队 enqueue():入队

下面是一个例子:

import tensorflow as tf

q = tf.FIFOQueue(3,"int32")
init = q.enqueue_many(([0,1,2],))

x = q.dequeue()
y = x + 1
q_inc = q.enqueue([y]) 

with tf.Session() as sess:
     init.run()
     for a in range(5):
          v,a = sess.run([x,q_inc])
          print(v)

打印结果: 0 1 2 1 2 原理如下图:

多线程协同

TensorFlow为我们提供了多线程协同操作的类—tf.Coordinator,其函数主要有: should_stop():确定当前线程是否退出 request_stop():通知其他线程退出 join():等待所有线程终止 假设有五个线程同时在工作,每个线程自身会先判断should_stop()的值,当其返回值为True时,则退出当前线程;如果为Flase,也继续该线程。此时如果线程3发出了request_stop()通知,则其它4个线程的should_stop()将全部变为True,然后线程4自身的should_stop()也将变为True,则退出了所有线程。 下面是一段代码:

import tensorflow as tf
import numpy as np
import time
import threading

def MyLoop(coord,worker_id):
     while not coord.should_stop():
          if np.random.rand()<0.09:
               print('stoping from id:',worker_id)
               coord.request_stop()
          else:
               print('working from id:',worker_id)
          time.sleep(1)

coord = tf.train.Coordinator()
#声明5个线程
threads=[threading.Thread(target=MyLoop,args=(coord,i,)) for i in range(5)]
#遍历五个线程
for t in threads:  
     t.start()
coord.join(threads)       

打印结果: working from id: 0 working from id: 1 working from id: 2 working from id: 3 working from id: 4 stoping from id: 0

在第一轮遍历过程中,所有进程的should_stop()都为Flase,且随机数都大于等于0.09,所以依次打印了working from id: 0-5,再重新回到进程0时,出现了小于0.09的随机数,即进程0发出了request_stop()请求,进程1-4的should_stop()返回值全部为True(进程退出),也就无法进入while,进程0的should_stop()返回值也将为True(退出),五个进程全部退出。

多线程操作队列

前面说到了队列的操作,多线程协同的操作,在多线程协同的代码中让每一个线程打印自己的id编号,下面我们说下如何用多线程操作一个队列。 TensorFlow提供了队列tf.QueueRunner类处理多个线程操作同一队列,启动的线程由上面提到的tf.Coordinator类统一管理,常用的操作有: QueueRunner():启动线程,第一个参数为线程需要操作的队列,第二个参数为对队列的操作,如enqueue_op,此时的enqueue_op = queue.enqueue() add_queue_runner():在图中的一个集合中加‘QueueRunner’,如果没有指定的合集的话,会被添加到tf.GraphKeys.QUEUE_RUNNERS合集 start_queue_runners():启动所有被添加到图中的线程

import tensorflow as tf

#创建队列
queue = tf.FIFOQueue(100,'float')
#入队
enqueue_op = queue.enqueue(tf.random_normal([1]))
#启动5个线程,执行enqueue_op
qr = tf.train.QueueRunner( queue,[enqueue_op] * 5)
#添加线程到图
tf.train.add_queue_runner(qr)
#出队
out_tensor = queue.dequeue()

with tf.Session() as sess:
     coord = tf.train.Coordinator()
     threads=tf.train.start_queue_runners(sess=sess,coord=coord)
     for i in range(6):
          print(sess.run(out_tensor)[0])
     coord.request_stop()
     coord.join(threads)

打印结果: -0.543751 -0.712543 1.32066 0.2471 0.313005 -2.16349

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏腾讯移动品质中心TMQ的专栏

GT3.1简化您的App性能测试(2)——原理讲解,溯本求源

GT3.1的版本更新,带来了全面的维度分析。那么这些功能是如何实现的呢?本章GT君将详细的从CUP维度、内存维度、流量维度、流畅度维度为大家讲解这些功能的作用和...

5026
来自专栏编程

python奇遇记:深入的了解函数

很久没更新了,抱歉。最近一段时间忙着对付各种考试,现在总算是考完了,继续来聊聊Python。Python中的函数使用def关键字定义,这个大家都知道,而且Pyt...

15910
来自专栏精讲JAVA

Gof设计模式之装饰者模式(七)

转发于网络(过一段时间会更新一个自己的新版本) 定义:动态给一个对象添加一些额外的职责,就象在墙上刷油漆.使用Decorator模式相比用生成子类方式达到功能的...

1876
来自专栏Golang语言社区

Go语言并发与并行学习笔记

Go语言的并发和并行 不知道你有没有注意到一个现象,还是这段代码,如果我跑在两个goroutines里面的话: var quit chan int = make...

3076
来自专栏Jimoer

JVM学习记录-Java内存模型(二)

在处理多线程数据竞争问题时,不仅仅是可以使用synchronized关键字来实现,使用volatile也可以实现。

663
来自专栏chenssy

【死磕Java并发】—–Java内存模型之总结

经过四篇博客阐述,我相信各位对Java内存模型有了最基本认识了,下面LZ就做一个比较简单的总结。 总结 JMM规定了线程的工作内存和主内存的交互关系,以及线程之...

3348
来自专栏Golang语言社区

Go语言并发与并行学习笔记

Go语言的并发和并行 不知道你有没有注意到一个现象,还是这段代码,如果我跑在两个goroutines里面的话: var quit chan int = make...

3575
来自专栏听雨堂

.Net中使用带UI的OCX的方法

方法一:在工具箱中插入COM控件,当把控件拖到界面上后,将会自动产生两个封装的dll,并在引用中添加。 问题:当ocx需要不断升级时,这种方法很痛苦,需要重新...

1717
来自专栏小樱的经验随笔

【Python学习笔记之一】Python关键字及其总结

前言 最近在学习Java Sockst的时候遇到了一些麻烦事,我觉得我很有必要重新研究学习Python这种脚本语言,参考大神的经验,淘到了一本学习Python的...

2666
来自专栏BestSDK

封装、私有,一文掌握Python关键代码

首先,什么是 Python?根据 Python 创建者 Guido van Rossum 所言,Python 是一种高级编程语言,其设计的核心理念是代码的易读性...

2873

扫码关注云+社区