2010-12-05 18 views
2

我正在编写一个用于洗牌大量数据的小脚本。这件事情是这样的:Python:使用子进程流式传输数据而没有死锁?

outproc = None 
for input in input_files: 
    p = Popen('process_input "%s" | more_input_processing' %(input,), 
       shell=True, stdout=PIPE) 
    for line in p.stdout.xreadlines(): 
     if linecount % 1000000 == 0: 
      outfile = "output%03d" %(linecount // 1000000,) 
      if outproc: 
       outproc.stdin.close() 
       result = outproc.wait() # <-- deadlock here 
       assert result == 0, "outproc exited with %s" %(result,) 
      outproc = Popen('handle_output "%s"' %(outfile,), 
          shell=True, stdin=PIPE) 
     linecount += 1 
     outproc.stdin.write(line) 
    p.stdout.close() 
    result = p.wait() 
    assert result == 0, "p exited with %s" %(result,) 

如文档警告说,虽然,我打一个僵局时,我尝试等待outproc(见注释)。

本文档提出的“解决方案”是使用.communicate() ......但这样做会涉及在刷新之前将所有输入读入内存,这是不可取的。

那么,我怎样才能在没有死锁的子流程之间流数据?

+0

好的,所以如果我实际上没有在子进程中等待(即删除所有对`.wait()`)的调用,一切似乎都有效,这对于这个脚本来说很好(这只是一次性的)。如果我能弄清楚如何使它正常工作,那将是很好的,但是... – 2010-12-05 22:35:25

回答

0

您在子流程实际读取的管道上没有使用close,因此它不会收到SIGPIPE或任何导致它退出的内容。只要有足够的数据就可以杀死进程。另外,管道输入和输出,并使用选择知道什么时候应该读或写。

+0

你是什么意思,“不在子流程读取的管道上使用`close`”? `handle_output`脚本不断地从标准输入读取...所以当我`outproc.stdin.close()`,不关闭它读取的管道? – 2010-12-05 22:07:34