0

我想使用Tensorflow高级apis以分布式方式在MNIST上训练卷积神经网络。 我试着指定一个集群配置,并将它传递给一个Estimator(下面的代码)。tf.learn Estimators的分布式培训?

我收到以下错误 参数MergeFrom()必须是同一类的实例:预计tensorflow.ConfigProto了财产

有谁知道什么是错的我是多么指定配置?

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function 


import grpc 
import numpy as np 
import tensorflow as tf 
from tensorflow.contrib import learn 
from tensorflow.contrib.learn.python.learn.estimators import model_fn as model_fn_lib 
from tensorflow.contrib.learn.python.learn.estimators import run_config as run_config_lib 
from tensorflow.python import debug as tf_debug 
tf.logging.set_verbosity(tf.logging.ERROR) 
import json 
import os 
import shutil 

### Data - Mnist 

mnist=learn.datasets.load_dataset('mnist') 
train_data=mnist.train.images 
train_labels=np.asarray(mnist.train.labels, dtype=np.int32) 
eval_data=mnist.test.images 
eval_labels=np.asarray(mnist.test.labels, dtype=np.int32) 

BATCH_SIZE=100 
NUM_EPOCHS=10 
train_input_fn = learn.io.numpy_input_fn({'x': train_data}, train_labels, shuffle=True, batch_size=BATCH_SIZE, 
             num_epochs=NUM_EPOCHS) 
batch_size = 100 
num_epochs = 1 
eval_input_fn = learn.io.numpy_input_fn({'x': eval_data}, eval_labels, shuffle=False, batch_size=batch_size, num_epochs=num_epochs) 

### Cluster 

my_cluster = {'ps': ['/cpu:0'], 
       'worker': ['/gpu:0']} 
os.environ['TF_CONFIG'] = json.dumps(
      {'cluster': my_cluster, 
      'task': {'type': 'worker', 'index': 1}}) 

my_configs=learn.RunConfig() 

server = tf.train.Server(server_or_cluster_def=my_configs.cluster_spec, job_name='worker') 

### Model 

def cnn_model_fn(features, labels, mode): 

    input_layer=tf.reshape(features['x'],shape=[-1,28,28,1]) 

    #conv1 
    conv1=tf.layers.conv2d(inputs=input_layer, 
          filters=32, 
          kernel_size=[5, 5], 
          padding='same', 
          activation=tf.nn.relu) 
    pool1=tf.layers.max_pooling2d(inputs=conv1, pool_size=[2,2], strides=2) 

    #conv2 
    conv2=tf.layers.conv2d(inputs=pool1, 
          filters=64, 
          kernel_size=[5,5], 
          padding='same', 
          activation=tf.nn.relu) 
    pool2=tf.layers.max_pooling2d(inputs=conv2, pool_size=[2,2], strides=2) 

    #fully connected layers 
    pool2_flat=tf.reshape(pool2, [-1, 7*7*64]) 
    dense1=tf.layers.dense(pool2_flat, 1024, activation=tf.nn.relu) 
    dropout = tf.layers.dropout(inputs=dense1, rate=0.4, training=mode == learn.ModeKeys.TRAIN) 

    #fc2 
    logits=tf.layers.dense(dropout, 10, activation=tf.nn.relu) 
    loss = None 
    train_op = None 

    #loss 
    if mode != learn.ModeKeys.INFER: 
     onehot_labels=tf.one_hot(indices=tf.cast(labels, tf.int32),depth=10) 
     loss=tf.losses.softmax_cross_entropy(onehot_labels=onehot_labels, logits=logits) 

    #optimizer 
    if mode == learn.ModeKeys.TRAIN: 
     with tf.device("/job:worker/task:1"): 
      train_op = tf.contrib.layers.optimize_loss(
      loss=loss, 
      global_step=tf.contrib.framework.get_global_step(), 
      learning_rate=0.0001, 
      optimizer="Adam") 

    #predictions 
    predictions={ 
      'classes': tf.argmax(logits, axis=1) , 
      'predictions': tf.nn.softmax(logits,name="softmax_tensor")   
     } 
    return model_fn_lib.ModelFnOps(mode=mode, predictions=predictions, loss=loss, train_op=train_op) 

classifier=learn.Estimator(model_fn=cnn_model_fn, model_dir="/tmp/mnist_distributed", config=my_configs) 

### logging 

tensors_to_log = {"probabilities": "softmax_tensor"} 
logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=50) 

### Metrics 

metrics = { 
    "accuracy": 
     learn.MetricSpec(
      metric_fn=tf.metrics.accuracy, prediction_key="classes"), 
} 

### Distributing training 

distributed_experiment=learn.Experiment(estimator=classifier, 
       train_input_fn=train_input_fn, 
       eval_input_fn=eval_input_fn, 
       eval_metrics=metrics, 
       #train_monitors=my_monitors, 
       train_steps=200, 
       ) 

distributed_experiment.train_and_evaluate() 

回答

0

my_config应该是RunConfig的一个实例,而不是RunConfig本身。当RunConfig初始化时,它将从TF_CONFIG环境变量中加载ps,workers和task配置。 https://www.tensorflow.org/api_docs/python/tf/contrib/learn/RunConfig

+0

固定类实例化。谢谢,我现在能够启动一个服务器,但是当我运行 distributed_experiment.train() 时,上面的代码仍然会停止。您是否看到其他可能丢失的内容? – smh

+0

@smh你能提供任何日志吗?你没有1 ps和1个工人吗? – BoscoTsang

+0

我是否需要像ps一样启动不同的服务器? 我很难定义群集,或者培训在cnn_model_fn中分布的方式,但还没有想出如何解决这个问题。这个是我之前得到的日志** distributed_experiment.train()** halts: 'DEBUG:tensorflow:将特征信息设置为{'x':TensorSignature(dtype = tf.float32,shape = TensorShape([Dimension ),Dimension(784)]),is_sparse = False)}。 DEBUG:tensorflow:将标签信息设置为TensorSignature(dtype = tf.int32,shape = TensorShape([Dimension(None)]),is_sparse = False) INFO:tensorflow:Create CheckpointSaverHook. – smh

1

如果妳想要运行TF分布式估计,有一个实例:

from tensorflow.contrib.learn.python.learn import learn_runner 
from tensorflow.contrib.learn.python.learn.estimators import run_config 

... 

learn_runner.run(
    experiment_fn=create_experiment_fn(config), 
    output_dir=output_dir) 

和“experiment_fn”这里只是在你的代码“distributed_experiment”。在你的实验中还应该有一个'output_dir'