2017-08-01 25 views
0

Tensorflow为我们提供了两种实现读取数据的方式。如何理解tf队列中的多线程?

第一种方式,使用许多阅读器,如tf.TextLineReader,每个线程一个阅读器。

第二种方法,只用一个阅读器和排队OPS的多线程,例如我们可以使用tf.train.shuffle_batch并设置num_threads大于1

我无法理解的第二种方法时,我们只需要一个阅读器加载数据(也许是一个线程),为什么我们需要这么多的线程入队?

而对于第一种方式,我们应该使用tf.train.shuffle_batch_join,没有我们可以设置的num_threads参数,所以我认为第一种方式是可以理解的。

任何人都可以给我一些解释,为什么我们需要一个读者,但许多线程入队?

回答

0

根据我的理解,它们仅在输入类型方面有所不同。 tf.train.batch_join接受样品列表,而tf.train.batch一次只取一个样品。

tf.train.batch_join的情况下,队列的输入采样预计由一个或多个线程提供。所以,线程的数量是通过输入中的采样数隐式控制的。另一方面,tf.train.batchnum_threads参数能够创建多个入队操作。第一个参数tensorshttps://www.tensorflow.org/api_docs/python/tf/train/batch)由几个线程初始化。

使用多个入队线程隐藏延迟。考虑一个简单的场景,其中负责排队样本的读取器/功能/操作花费大量时间。在这种情况下,有多个线程可以节省生命。以下代码段给出了一个例子:

import tensorflow as tf 
import numpy as np 
import time 

num_steps = 50 
batch_size = 2 
input_shape, target_shape = (2, 2),() 
num_threads = 4 
queue_capacity = 10 

get_random_data_sample函数随机产生(输入,目标)一对具有一些随机延迟。

def get_random_data_sample(): 
    # Random inputs and targets 
    np_input = np.float32(np.random.normal(0,1, (2,2))) 
    np_target = np.int32(1) 

    # Sleep randomly between 1 and 3 seconds. 
    time.sleep(np.random.randint(1,3,1)[0]) 

    return np_input, np_target 

# Wraps a python function and uses it as a TensorFlow op. 
tensorflow_input, tensorflow_target = tf.py_func(get_random_data_sample, [], [tf.float32, tf.int32]) 

注意[tensorflow_input, tensorflow_target]操作(即读者)是由num_threads多线程运行。您可以通过更改num_threads并观察运行时间来验证此情况。

### tf.train.batch ### 
batch_inputs, batch_targets = tf.train.batch([tensorflow_input, tensorflow_target], 
              batch_size=batch_size, 
              num_threads=num_threads, 
              shapes=[input_shape, target_shape], 
              capacity=queue_capacity) 

sess = tf.InteractiveSession() 
tf.train.start_queue_runners() 
start_t = time.time() 
for i in range(num_steps): 
    numpy_inputs, numpy_targets = sess.run([batch_inputs, batch_targets]) 
print(time.time()-start_t) 

inputs具有在下面的例子中num_threads元件。在tf.train.batch

### tf.train.batch_join ### 
inputs = [[tensorflow_input, tensorflow_target] for _ in range(num_threads)] 
join_batch_inputs, join_batch_targets = tf.train.batch_join(inputs, 
                shapes=[input_shape, target_shape], 
                batch_size=batch_size, 
                capacity=queue_capacity,) 
sess = tf.InteractiveSession() 
tf.train.start_queue_runners() 
start_t = time.time() 
for i in range(num_steps): 
    numpy_inputs, numpy_targets = sess.run([join_batch_inputs, join_batch_targets]) 
print(time.time()-start_t) 

多排队线程,有效地隐藏等待时间。你可以玩num_threadsqueue_capacity。两项行动的表现非常相似。

+0

哦,在我的系统中,第一种方式('tf.train.batch')比第二种方式('tf.train.batch_join')快,第一种方式花费第二种方式的四分之一时间。这太神奇了!如何解释? – Gauss

+0

我的系统都需要36-38秒。我正在使用tensorflow 1.2并在GPU上运行它们。请注意,我分别测量性能。换句话说,运行不同的文件。 – eaksan

+0

为什么使用一个阅读器但多线程也很快?我认为一个读者应该比许多读者慢,因为一个读者可以只读取一个文件。 – Gauss