2017-01-14 58 views
7

我的训练过程使用tfrecord格式的火车& eval数据集。TFRecordReader看起来非常慢,并且多线程阅读不起作用

我测试阅读器的基准,只有8000记录/秒。和io速度(见iotop命令)只有400KB-500KB/s。

我使用的protobuf的CPP版本在这里

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/get_started/os_setup.md#protobuf-library-related-issues

如果可能的话,提供一个最小的可重复的例子(我们通常没有时间来阅读数百你的代码的行)

def read_and_decode(filename_queue): 
    reader = tf.TFRecordReader() 
    _, serialized_example = reader.read(filename_queue) 
    return serialized_example 
    serialized_example = read_and_decode(filename_queue) 
    batch_serialized_example = tf.train.shuffle_batch(
     [serialized_example], 
     batch_size=batch_size, 
     num_threads=thread_number, 
     capacity=capacity, 
     min_after_dequeue=min_after_dequeue) 
    features = tf.parse_example(
     batch_serialized_example, 
     features={ 
      "label": tf.FixedLenFeature([], tf.float32), 
      "ids": tf.VarLenFeature(tf.int64), 
      "values": tf.VarLenFeature(tf.float32), 
     }) 

您尝试过其他尝试的解决方案吗?

我尝试在tf.train.shuffle_batch中设置num_threads,但无法正常工作。

似乎当设置为2个线程时,它工作在8000个记录/秒,当放大线程数时,它变慢。 (我删除了所有花费cpus的ops,只是读取数据。)

我的服务器是24核心cpus。

+0

您受CPU或磁盘限制吗?做时间线可视化可以帮助看到瓶颈在哪里 –

+0

很高兴再次见到你。 1)不,我不限制CPU的使用。 2)我的tfrecords文件存储在本地磁盘驱动器中。这是表现的原因吗? 3)我现在要做时间线。感谢您的建议。我稍后再更新。 – ericyue

+0

这里是我的基准脚本和时间轴结果(timeline.json原始文件inlcude)https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8 – ericyue

回答

7

这里的问题是,每个session.run都有一个固定的开销,并且在队列中填充很多小例子会很慢。

特别是,每个session.run大约100-200 usec,所以你只能做约5k-10k session.run每秒的呼叫。

如果执行Python性能分析(python -m cProfile),这个问题很明显,但很难从时间轴配置文件或CPU配置文件中查看。

解决方法是使用enqueue_many分批向队列中添加内容。我从https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8中抽取了基准,并将其修改为按照.run的调用排列许多项目,并使速度提高了10倍。

的修改是修改电话如下:

if enqueue_many: 
    reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)) 
    queue_batch = [] 
    for i in range(enqueue_many_size): 
     _, serialized_example = reader.read(filename_queue) 
     queue_batch.append(serialized_example) 
    batch_serialized_example = tf.train.shuffle_batch(
     [queue_batch], 
     batch_size=batch_size, 
     num_threads=thread_number, 
     capacity=capacity, 
     min_after_dequeue=min_after_dequeue, 
     enqueue_many=True) 

对于完整的源代码,请点击这里: https://github.com/yaroslavvb/stuff/blob/master/ericyue-slowreader/benchmark.py

很难优化它去得更快,因为现在大部分的时间是花在队列操作上。查看stripped down版本,它只是将整数添加到队列中,您也可以获得类似的速度,并且在查看时间轴时,时间花在出队操作上。

enter image description here

每个出队运算大约需要60微秒,但有平均5并行捉迷藏,让您得到12微秒每出队。所以这意味着你会在最好的情况下每秒钟得到200k个例子。

5

这里有雅罗斯拉夫的回答一个简单的加速建设:

Tensorflow有一个内置的功能,tf.TFRecordReader.read_up_to,在每个session.run()调用读取多个记录,从而消除造成多次调用多余开销。

enqueue_many_size = SOME_ENQUEUE_MANY_SIZE 
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)) 
_, queue_batch = reader.read_up_to(filename_queue, enqueue_many_size) 
batch_serialized_example = tf.train.shuffle_batch(
    [queue_batch], 
    batch_size=batch_size, 
    num_threads=thread_number, 
    capacity=capacity, 
    min_after_dequeue=min_after_dequeue, 
    enqueue_many=True) 

与雅罗斯拉夫的答案,你需要设置enqueue_many=True使批处理功能知道它正在接受多个记录。

这在我的用例中非常快。

+0

谢谢!这对我来说也非常快。用io解决了我所有的速度问题。 – Pekka

1

增编雅罗斯拉夫的回答是: 您可以使用tf.python_io.tf_record_iterator通过以实例来遍历它们添加到列表中,你可以传递给tf.train.shuffle_batchenqueue_many=true

queue_batch = [] 
for serialized_example in tf.python_io.tf_record_iterator(filename,options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)): 
    queue_batch.append(serialized_example) 
batch_serialized_example = tf.train.shuffle_batch(
    [queue_batch], 
    batch_size=batch_size, 
    num_threads=thread_number, 
    capacity=capacity, 
    min_after_dequeue=min_after_dequeue, 
    enqueue_many=True) 

似乎试图遍历使用reader.read()的示例将导致每批读取一次。即第n批将是batch_num副本的第n记录而不是batch_num许多独特的记录。