2014-02-07 176 views
2

我试图在Python中输出一个冗长的命令输出到标准输出和标准错误。我想轮询子进程并将输出写入单独的文件。同时读取子进程标准输出和标准错误

我尝试以下,在此基础上回答Non-blocking read on a subprocess.PIPE in python

import subprocess 

from Queue import Queue, Empty 
from threading import Thread 

def send_cmd(cmd, shell=False): 
    """ 
    Send cmd to the shell 
    """ 
    if not isinstance(cmd, list): cmd = shlex.split(cmd) 

    params = {'args' : cmd, 
       'stdout' : subprocess.PIPE, 
       'stderr' : subprocess.PIPE, 
       'shell' : shell} 

    proc = subprocess.Popen(**params) 

    return proc 

def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull): 
    """ 
    Monitor the process that is running, and log it if desired 
    """ 
    def enqueue_output(out, queue): 
     for line in iter(out.readline, b''): 
      queue.put(line) 

    def setup_process(log_name, proc): 
     FID = open(log_name, 'w') 
     queue = Queue() 
     thread = Thread(target=enqueue_output, args=(proc, queue)) 
     thread.daemon = True # Thread dies with program 
     thread.start() 

     return (queue, FID) 

    def check_queues(queue_list, errors): 
     for queue, FID in queue_list: 
      try: 
       line = queue.get_nowait() 
       if 'error' in line.lower() or 'failed' in line.lower(): 
        errors.append(line) 
      except Empty: 
       pass 
      else: 
       FID.write(line) 

    errors = [] 
    queue_list = [] 

    for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]: 
     queue_list.append(setup_process(log, proc) 

    while process.poll() is None: 
     check_queues(queue_list, errors) 

    while not queue_list[0][0].empty() or queue_list[1][0].empty(): 
     check_queues(queue_list, errors) 

    for queue, FID in queue_list: 
     FID.close() 

return errors 

process = send_cmd('long_program.exe') 
errors = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log') 

但标准输出的输出文件是空的,而对于错误中的输出文件只有几行代码,而两者都应该是相当大的。

我错过了什么?

回答

0

我也曾经..这里是一些旧的代码我写

 


class Process_Communicator(): 

    def join(self): 
     self.te.join() 
     self.to.join() 
     self.running = False 
     self.aggregator.join() 
     self.ti.join() 

    def enqueue_in(self): 
     while self.running and self.p.stdin is not None: 
      while not self.stdin_queue.empty(): 
       s = self.stdin_queue.get() 
       self.p.stdin.write(str(s) + '\n\r') 
      pass 

    def enqueue_output(self): 
     if not self.p.stdout or self.p.stdout.closed: 
      return 
     out = self.p.stdout 
     for line in iter(out.readline, b''): 
      self.qo.put(line) 
     # out.flush() 

    def enqueue_err(self): 
     if not self.p.stderr or self.p.stderr.closed: 
      return 
     err = self.p.stderr 
     for line in iter(err.readline, b''): 
      self.qe.put(line) 

    def aggregate(self): 
     while (self.running): 
      self.update() 
     self.update() 

    def update(self): 
     line = "" 
     try: 
      while self.qe.not_empty: 
       line = self.qe.get_nowait() # or q.get(timeout=.1) 
       self.unbblocked_err += line 
     except Queue.Empty: 
      pass 

     line = "" 
     try: 
      while self.qo.not_empty: 
       line = self.qo.get_nowait() # or q.get(timeout=.1) 
       self.unbblocked_out += line 
     except Queue.Empty: 
      pass 

     while not self.stdin_queue.empty(): 
       s = self.stdin_queue.get() 
       self.p.stdin.write(str(s)) 

    def get_stdout(self, clear=True): 
     ret = self.unbblocked_out 
     if clear: 
      self.unbblocked_out = "" 
     return ret 

    def has_stdout(self): 
     ret = self.get_stdout(False) 
     if ret == '': 
      return None 
     else: 
      return ret 

    def get_stderr(self, clear=True): 
     ret = self.unbblocked_out 
     if clear: 
      self.unbblocked_out = "" 
     return ret 

    def has_stderr(self): 
     ret = self.get_stdout(False) 
     if ret == '': 
      return None 
     else: 
      return ret 

    def __init__(self, subp): 
     '''This is a simple class that collects and aggregates the 
     output from a subprocess so that you can more reliably use 
     the class without having to block for subprocess.communicate.''' 
     self.p = subp 
     self.unbblocked_out = "" 
     self.unbblocked_err = "" 
     self.running = True 
     self.qo = Queue.Queue() 
     self.to = threading.Thread(name="out_read", 
            target=self.enqueue_output, 
            args=()) 
     self.to.daemon = True # thread dies with the program 
     self.to.start() 

     self.qe = Queue.Queue() 
     self.te = threading.Thread(name="err_read", 
            target=self.enqueue_err, 
            args=()) 
     self.te.daemon = True # thread dies with the program 
     self.te.start() 

     self.stdin_queue = Queue.Queue() 
     self.aggregator = threading.Thread(name="aggregate", 
              target=self.aggregate, 
              args=()) 
     self.aggregator.daemon = True # thread dies with the program 
     self.aggregator.start() 
     pass 

你可能不需要整个的例子,但随意剪切复制粘贴你所需要的。显示我是如何进行线程处理也很重要。

0

该代码看起来比任务要求更复杂。我不明白你为什么需要在这里拨打process.poll()queue.get_nowait()。将子进程的stdout/stderr传递给多个接收器;您可以从teed_call() that accepts arbitrary file-like objects开始:您可以传递日志文件和特殊文件类对象,这些对象在.write()方法中积累了errors

用最少的更改修复你的代码;您应该在读取器线程上调用.join()(即使process.poll()而不是None即,子进程已退出;可能有一些未决的输出,加入读取器的线程可确保读取所有输出)。

相关问题