2010-08-26 21 views
10

我想执行一个进程,限制执行时间以秒为单位的超时并获取进程产生的输出。我想在windows,linux和freebsd上执行此操作。蟒蛇子进程超时和大输出(> 64K)

我曾尝试三种不同的方式实现这一点:

  1. CMD - 没有超时和subprocess.PIPE输出捕获。

    行为:操作与预期,但不支持超时,我需要超时...

  2. cmd_to - 随着超时和subprocess.PIPE输出捕获。

    行为:输出> = 2^16字节时阻止子进程执行。

  3. cmd_totf - 用于输出捕获的超时和tempfile.NamedTemporaryfile。

    行为:按预期方式运行,但使用磁盘上的临时文件。

这些可以在下面查看。

从下面的输出中可以看出,当使用子处理时,超时码会阻止子进程的执行.PIPE和子进程的输出大于等于2^16字节。

子进程文档指出,在调用process.wait()和使用subprocessing.PIPE时,这是预期的,但是在使用process.poll()时没有给出警告,那么这里出了什么问题?

我有一个在cmd_totf中使用tempfile模块的解决方案,但是权衡是它将输出写入磁盘,这是我真的很想避免的。

所以我的问题是:

  • 我是什么在cmd_to做错了吗?
  • 有没有办法做到我想要的,而不使用临时文件/保持输出内存。

脚本来生成一束输出( 'exp_gen.py')的:包装纸

#!/usr/bin/env python 
import sys 
output = "b"*int(sys.argv[1]) 
print output 

三种不同的实施方式(CMD,cmd_to,cmd_totf)周围subprocessing.Popen:

#!/usr/bin/env python 
import subprocess, time, tempfile 
bufsize = -1 

def cmd(cmdline, timeout=60): 
    """ 
    Execute cmdline. 
    Uses subprocessing and subprocess.PIPE. 
    """ 

    p = subprocess.Popen(
    cmdline, 
    bufsize = bufsize, 
    shell = False, 
    stdin = subprocess.PIPE, 
    stdout = subprocess.PIPE, 
    stderr = subprocess.PIPE 
) 

    out, err = p.communicate() 
    returncode = p.returncode 

    return (returncode, err, out) 

def cmd_to(cmdline, timeout=60): 
    """ 
    Execute cmdline, limit execution time to 'timeout' seconds. 
    Uses subprocessing and subprocess.PIPE. 
    """ 

    p = subprocess.Popen(
    cmdline, 
    bufsize = bufsize, 
    shell = False, 
    stdin = subprocess.PIPE, 
    stdout = subprocess.PIPE, 
    stderr = subprocess.PIPE 
) 

    t_begin   = time.time()    # Monitor execution time 
    seconds_passed = 0 

    while p.poll() is None and seconds_passed < timeout: 
    seconds_passed = time.time() - t_begin 
    time.sleep(0.1) 

    #if seconds_passed > timeout: 
    # 
    # try: 
    # p.stdout.close() # If they are not closed the fds will hang around until 
    # p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
    # p.terminate()  # Important to close the fds prior to terminating the process! 
    #      # NOTE: Are there any other "non-freed" resources? 
    # except: 
    # pass 
    # 
    # raise TimeoutInterrupt 

    out, err = p.communicate() 
    returncode = p.returncode 

    return (returncode, err, out) 

def cmd_totf(cmdline, timeout=60): 
    """ 
    Execute cmdline, limit execution time to 'timeout' seconds. 
    Uses subprocessing and tempfile instead of subprocessing.PIPE. 
    """ 

    output = tempfile.NamedTemporaryFile(delete=False) 
    error = tempfile.NamedTemporaryFile(delete=False) 

    p = subprocess.Popen(
    cmdline, 
    bufsize = 0, 
    shell = False, 
    stdin = None, 
    stdout = output, 
    stderr = error 
) 

    t_begin   = time.time()    # Monitor execution time 
    seconds_passed = 0 

    while p.poll() is None and seconds_passed < timeout: 
    seconds_passed = time.time() - t_begin 
    time.sleep(0.1) 

    #if seconds_passed > timeout: 
    # 
    # try: 
    # p.stdout.close() # If they are not closed the fds will hang around until 
    # p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
    # p.terminate()  # Important to close the fds prior to terminating the process! 
    #      # NOTE: Are there any other "non-freed" resources? 
    # except: 
    # pass 
    # 
    # raise TimeoutInterrupt 

    p.wait() 

    returncode = p.returncode 

    fd   = open(output.name) 
    out   = fd.read() 
    fd.close() 

    fd = open(error.name) 
    err = fd.read() 
    fd.close() 

    error.close() 
    output.close() 

    return (returncode, err, out) 

if __name__ == "__main__": 

    implementations = [cmd, cmd_to, cmd_totf] 
    bytes  = ['65535', '65536', str(1024*1024)] 
    timeouts = [5] 

    for timeout in timeouts:  
    for size in bytes:  
     for i in implementations: 
     t_begin   = time.time() 
     seconds_passed = 0   
     rc, err, output = i(['exp_gen.py', size], timeout) 
     seconds_passed = time.time() - t_begin 
     filler = ' '*(8-len(i.func_name)) 
     print "[%s%s: timeout=%d, iosize=%s, seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed) 
从执行

输出:

['cmd'  : timeout=5, iosize=65535, seconds=0.016447] 
['cmd_to' : timeout=5, iosize=65535, seconds=0.103022] 
['cmd_totf': timeout=5, iosize=65535, seconds=0.107176] 
['cmd'  : timeout=5, iosize=65536, seconds=0.028105] 
['cmd_to' : timeout=5, iosize=65536, seconds=5.116658] 
['cmd_totf': timeout=5, iosize=65536, seconds=0.104905] 
['cmd'  : timeout=5, iosize=1048576, seconds=0.025964] 
['cmd_to' : timeout=5, iosize=1048576, seconds=5.128062] 
['cmd_totf': timeout=5, iosize=1048576, seconds=0.103183] 
+0

试试从http://stackoverflow.com/questions/874815/how-do-i-get-real-time-information-back-from-a-subprocess-popen-in-python-2-5回答。 – 2010-12-04 21:05:31

+0

你应该提到哪个版本的python。由于AFAIK,有相当多的变化从2.6到2.7相关的'subprocess'模块 – 2011-12-12 04:19:49

+0

也参见http://stackoverflow.com/questions/1191374/subprocess-with-timeout/8507775#8507775 – bortzmeyer 2011-12-14 16:15:09

回答

4

与之相对所有子过程文档中的警告然后直接从process.stdout和process.stderr读取提供了更好的解决方案。

更好地说,我的意思是我可以从超过2^16字节的进程中读取输出,而不必临时将输出存储在磁盘上。

的代码如下:

import fcntl 
import os 
import subprocess 
import time 

def nonBlockRead(output): 
    fd = output.fileno() 
    fl = fcntl.fcntl(fd, fcntl.F_GETFL) 
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 
    try: 
     return output.read() 
    except: 
     return '' 

def cmd(cmdline, timeout=60): 
    """ 
    Execute cmdline, limit execution time to 'timeout' seconds. 
    Uses the subprocess module and subprocess.PIPE. 

    Raises TimeoutInterrupt 
    """ 

    p = subprocess.Popen(
     cmdline, 
     bufsize = bufsize, # default value of 0 (unbuffered) is best 
     shell = False, # not really needed; it's disabled by default 
     stdout = subprocess.PIPE, 
     stderr = subprocess.PIPE 
    ) 

    t_begin = time.time() # Monitor execution time 
    seconds_passed = 0 

    stdout = '' 
    stderr = '' 

    while p.poll() is None and seconds_passed < timeout: # Monitor process 
     time.sleep(0.1) # Wait a little 
     seconds_passed = time.time() - t_begin 

     # p.std* blocks on read(), which messes up the timeout timer. 
     # To fix this, we use a nonblocking read() 
     # Note: Not sure if this is Windows compatible 
     stdout += nonBlockRead(p.stdout) 
     stderr += nonBlockRead(p.stderr) 

    if seconds_passed >= timeout: 
     try: 
      p.stdout.close() # If they are not closed the fds will hang around until 
      p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
      p.terminate()  # Important to close the fds prior to terminating the process! 
           # NOTE: Are there any other "non-freed" resources? 
     except: 
      pass 

     raise TimeoutInterrupt 

    returncode = p.returncode 

    return (returncode, stdout, stderr) 
+0

这很好,但是如果没有输出要读取,就会阻塞读取,这会弄乱定时器。我已经修复了它的版本,并添加了一个编辑。 – 2011-12-11 08:37:31

+0

@JohnDoe:[''fcn​​tl'在Windows上不起作用](http://stackoverflow.com/q/375427/4279) – jfs 2014-02-26 07:17:11

1

免责声明:这个答案不是Windows测试,也没有FreeBSD的。但使用的模块应该在这些系统上工作。我相信这应该是对你的问题的一个有效的答案 - 它适用于我。

这里是我刚刚砍过的解决linux上的问题的代码。它是几个Stackoverflow线程和我自己在Python 3文档中的研究的组合。

此代码的主要特点:

  • 使用进程不线程阻塞I/O,因为它们可以更可靠地p.terminated()
  • 实现一个可再触发的超时监视该重新开始计数,每当一些输出发生
  • 实现长期超时看门狗限制整体运行
  • 可以在标准输入饲料(虽然我只需要一次性喂短字符串)
  • 可以在通常的Popen中捕获stdout/stderr方法(只有stdout被编码,而stderr被重定向到stdout;但可以很容易地分开)
  • 这几乎是实时的,因为它只会每0.2秒检查一次输出。但是你可以减少这个或者轻松地删除等待间隔
  • 许多调试打印输出仍然可以看到什么时候发生。

唯一的代码依赖项是enum,实现为here,但代码很容易被修改为无需工作。它仅用于区分两个超时 - 如果您愿意,可以使用单独的例外。

下面的代码 - 像往常一样 - 反馈的高度赞赏: (编辑6月29日2012 - 代码现在实际工作)

# Python module runcmd 
# Implements a class to launch shell commands which 
# are killed after a timeout. Timeouts can be reset 
# after each line of output 
# 
# Use inside other script with: 
# 
# import runcmd 
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'], 
#         timeout_runtime, 
#         timeout_no_output, 
#         stdin_string).go() 
# 

import multiprocessing 
import queue 
import subprocess 
import time 

import enum 

def timestamp(): 
    return time.strftime('%Y%m%d-%H%M%S') 


class ErrorRunCmd(Exception): pass 
class ErrorRunCmdTimeOut(ErrorRunCmd): pass 

class Enqueue_output(multiprocessing.Process): 
    def __init__(self, out, queue): 
     multiprocessing.Process.__init__(self) 
     self.out = out 
     self.queue = queue 
     self.daemon = True 
    def run(self): 
     try: 
      for line in iter(self.out.readline, b''): 
       #print('worker read:', line) 
       self.queue.put(line) 
     except ValueError: pass # Readline of closed file 
     self.out.close() 
class Enqueue_input(multiprocessing.Process): 
    def __init__(self, inp, iterable): 
     multiprocessing.Process.__init__(self) 
     self.inp = inp 
     self.iterable = iterable 
     self.daemon = True 
    def run(self): 
     #print("writing stdin") 
     for line in self.iterable: 
      self.inp.write(bytes(line,'utf-8')) 
     self.inp.close() 
     #print("writing stdin DONE") 

class RunCmd(): 
    """RunCmd - class to launch shell commands 

    Captures and returns stdout. Kills child after a given 
    amount (timeout_runtime) wallclock seconds. Can also 
    kill after timeout_retriggerable wallclock seconds. 
    This second timer is reset whenever the child does some 
    output 

     (return_code, out) = RunCmd(['ls', '-l', '/etc'], 
            timeout_runtime, 
            timeout_no_output, 
            stdin_string).go() 

    """ 
    Timeout = enum.Enum('No','Retriggerable','Runtime') 

    def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None): 
     self.dbg = False 
     self.cmd = cmd 
     self.timeout_retriggerable = timeout_retriggerable 
     self.timeout_runtime = timeout_runtime 
     self.timeout_hit = self.Timeout.No 
     self.stdout = '--Cmd did not yield any output--' 
     self.stdin = stdin 
    def read_queue(self, q): 
     time_last_output = None 
     try: 
      bstr = q.get(False) # non-blocking 
      if self.dbg: print('{} chars read'.format(len(bstr))) 
      time_last_output = time.time() 
      self.stdout += bstr 
     except queue.Empty: 
      #print('queue empty') 
      pass 
     return time_last_output 
    def go(self): 
     if self.stdin: 
      pstdin = subprocess.PIPE 
     else: 
      pstdin = None 
     p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin) 
     pin = None 
     if (pstdin): 
      pin = Enqueue_input(p.stdin, [self.stdin + '\n']) 
      pin.start() 
     q = multiprocessing.Queue() 
     pout = Enqueue_output(p.stdout, q) 
     pout.start() 
     try: 
      if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime())) 
      time_begin = time.time() 
      time_last_output = time_begin 
      seconds_passed = 0 
      self.stdout = b'' 
      once = True     # ensure loop's executed at least once 
             # some child cmds may exit very fast, but still produce output 
      while once or p.poll() is None or not q.empty(): 
       once = False 
       if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout))) 

       tlo = self.read_queue(q) 
       if tlo: 
        time_last_output = tlo 

       now = time.time() 
       if now - time_last_output >= self.timeout_retriggerable: 
        self.timeout_hit = self.Timeout.Retriggerable 
        raise ErrorRunCmdTimeOut(self) 
       if now - time_begin >= self.timeout_runtime: 
        self.timeout_hit = self.Timeout.Runtime 
        raise ErrorRunCmdTimeOut(self) 

       if q.empty(): 
        time.sleep(0.1) 
      # Final try to get "last-millisecond" output 
      self.read_queue(q)    
     finally: 
      self._close(p, [pout, pin])    
     return (self.returncode, self.stdout)    

    def _close(self, p, procs): 
     if self.dbg: 
      if self.timeout_hit != self.Timeout.No: 
       print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit)) 
      else: 
       print('{} No timeout occured'.format(timestamp())) 
     for process in [proc for proc in procs if proc]: 
      try: 
       process.terminate() 
      except: 
       print('{} Process termination raised trouble'.format(timestamp())) 
       raise 
     try: 
      p.stdin.close() 
     except: pass 
     if self.dbg: print('{} _closed stdin'.format(timestamp())) 
     try: 
      p.stdout.close() # If they are not closed the fds will hang around until 
     except: pass 
     if self.dbg: print('{} _closed stdout'.format(timestamp())) 
      #p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception 
     try: 
      p.terminate()  # Important to close the fds prior to terminating the process! 
           # NOTE: Are there any other "non-freed" resources? 
     except: pass 
     if self.dbg: print('{} _closed Popen'.format(timestamp())) 
     try: 
      self.stdout = self.stdout.decode('utf-8') 
     except: pass 
     self.returncode = p.returncode 
     if self.dbg: print('{} _closed all'.format(timestamp())) 

使用带:

import runcmd 

cmd = ['ls', '-l', '/etc'] 

worker = runcmd.RunCmd(cmd, 
         40, # limit runtime [wallclock seconds] 
         2,  # limit runtime after last output [wallclk secs] 
         ''  # stdin input string 
         ) 
(return_code, out) = worker.go() 

if worker.timeout_hit != worker.Timeout.No: 
    print('A TIMEOUT occured: {}'.format(worker.timeout_hit)) 
else: 
    print('No timeout occured') 


print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out))) 
print('Output:') 
print(out) 

command - 第一个参数 - 应该是一个命令及其参数的列表。它用于呼叫Popen(shell=False),其超时时间在几秒钟内。目前没有禁用超时的代码。将timeout_no_output设置为time_runtime以有效禁用可重触发的timeout_no_outputstdin_string可以是要发送到命令的标准输入的任何字符串。如果您的命令不需要任何输入,则设置为None。如果提供了一个字符串,最后的'\ n'被追加。