2011-10-03 139 views
4

嗨,我在执行命令时出现问题,该命令通过Ubuntu 10服务器执行100MB文件的wget。除此之外,较短的命令可以正常工作。下面的类包含了我如何使用paramiko和我尝试克服这个问题的不同尝试(请参阅不同的run或exec方法)。在exec_cmd的情况下执行挂在这一行:Paramiko在执行大型wget命令时挂起

 out = self.in_buffer.read(nbytes, self.timeout) 

从paramiko的channel.py模块的recv方法。

同样的wget命令在使用Mac的普通ssh实用程序的shell中完美工作。

""" 
Management of SSH connections 
""" 

import logging 
import os 
import paramiko 
import socket 
import time 
import StringIO 


class SSHClient(): 
    def __init__(self): 
     self._ssh_client = paramiko.SSHClient() 
     self._ssh_client.load_system_host_keys() 
     self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
     self.time_out = 300 
     self.wait = 5 

    def connect(self, hostname, user, pkey): 
     retry = self.time_out 
     self.hostname = hostname 
     logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey)) 
     while retry > 0: 
      try: 
       self._ssh_client.connect(hostname, 
             username=user, 
             key_filename=os.path.expanduser(pkey), 
             timeout=self.time_out) 
       return 
      except socket.error, (value,message): 
       if value == 61 or value == 111: 
        logging.warning('SSH Connection refused, will retry in 5 seconds') 
        time.sleep(self.wait) 
        retry -= self.wait 
       else: 
        raise 
      except paramiko.BadHostKeyException: 
       logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname) 
       logging.warning('Edit that file to remove the entry and then try again') 
       retry = 0 
      except EOFError: 
       logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds') 
       time.sleep(self.wait) 
       retry -= self.wait 
     logging.error('Could not establish SSH connection') 

    def exists(self, path): 
     status = self.run('[ -a %s ] || echo "FALSE"' % path) 
     if status[1].startswith('FALSE'): 
      return 0 
     return 1 

    def shell(self): 
     """ 
     Start an interactive shell session on the remote host. 
     """ 
     channel = self._ssh_client.invoke_shell() 
     interactive_shell(channel) 

    def run(self, command): 
     """ 
     Execute a command on the remote host. Return a tuple containing 
     an integer status and a string containing all output from the command. 
     """ 
     logging.info('running:%s on %s' % (command, self.hostname)) 
     log_fp = StringIO.StringIO() 
     status = 0 
     try: 
      t = self._ssh_client.exec_command(command) 
     except paramiko.SSHException: 
      logging.error("Error executing command: " + command) 
      status = 1 
     log_fp.write(t[1].read()) 
     log_fp.write(t[2].read()) 
     t[0].close() 
     t[1].close() 
     t[2].close() 
     logging.info('output: %s' % log_fp.getvalue()) 
     return (status, log_fp.getvalue()) 

    def run_pty(self, command): 
     """ 
     Execute a command on the remote host with a pseudo-terminal. 
     Returns a string containing the output of the command. 
     """ 
     logging.info('running:%s on %s' % (command, self.hostname)) 
     channel = self._ssh_client.get_transport().open_session() 
     channel.get_pty() 
     status = 0 
     try: 
      channel.exec_command(command) 
     except: 
      logging.error("Error executing command: " + command) 
      status = 1 
     return status, channel.recv(1024) 

    def close(self): 
     transport = self._ssh_client.get_transport() 
     transport.close() 

    def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False): 
     logging.info('running:%s on %s' % (cmd, self.hostname)) 
     ssh = self._ssh_client 
     chan = ssh.get_transport().open_session() 
     stdin = chan.makefile('wb') 
     stdout = chan.makefile('rb') 
     stderr = chan.makefile_stderr('rb') 
     processed_cmd = cmd 
     if use_sudo: 
      processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"') 
     chan.exec_command(processed_cmd) 
     result = { 
      'stdout': [], 
      'stderr': [], 
     } 
     exit_status = chan.recv_exit_status() 
     result['exit_status'] = exit_status 

     def print_output(): 
      for line in stdout: 
       result['stdout'].append(line) 
       logging.info(line) 
      for line in stderr: 
       result['stderr'].append(line) 
       logging.info(line) 
     if verbose: 
      print processed_cmd 
      print_output() 
     return exit_status,result 

    def exec_cmd(self, cmd): 
     import select 
     ssh = self._ssh_client 
     channel = ssh.get_transport().open_session() 
     END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL" 
     cmd += ";echo " + END 
     logging.info('running:%s on %s' % (cmd, self.hostname)) 
     channel.exec_command(cmd) 
     out = "" 
     buf = "" 
     while END not in buf: 
      rl, wl, xl = select.select([channel],[],[],0.0) 
      if len(rl) > 0: 
       # Must be stdout 
       buf = channel.recv(1024) 
       logging.info(buf) 
       out += buf 
     return 0, out 

回答

2
  1. 在这种情况下,我会去的名单追加,然后串联。为什么?那么,Python中的字符串是不可变的。这意味着每次你使用+=时,你基本上都会创建两个新字符串并读取第三个字符串。另一方面,如果创建列表并追加它,则会将创建的字符串数量减半。
  2. 你真的需要多次拨打电话吗?我的理解是,如果这个过程是线程阻塞,你并不在乎。由于select或多或少围绕相同名称的C-方法的包装:

    的select()和PSELECT()允许一个程序来监视多个文件描述符,等待直到一个或多个所述文件描述符变成“准备“对于某些类别的I/O操作(例如可能的输入)。如果可以在没有阻塞的情况下执行相应的I/O操作(例如,读取(2)),则文件描述符被准备好准备就绪。

  3. 你不听的代码中的socket.timeout异常。
  4. 写入标准输出/文件系统可能会很昂贵,但是您记录的是由recv返回的每一行。你能移动日志行吗?
  5. 您是否考虑过手动处理阅读频道?技术上你唯一需要的代码是:
try: 
    out = self.in_buffer.read(nbytes, self.timeout) 
except PipeTimeout, e: 
    # do something with error 

它不能保证,但会切出额外的处理。

2

我有同样的问题,当我在远程SSH客户端上运行的shell脚本时,我的python脚本挂起,在400Mb文件上执行了wget命令。

我发现向wget命令添加超时修复了问题。 我原本:

wget的http://blah:8888/file.zip

这个现在

的wget -q -T90 http://blah:8888/file.zip

它就像一个魅力!

希望它有帮助。

+0

谢谢!正是我所需要的,-q对我来说已经足够了.... –