2014-09-23 64 views
0

我有这样的Python代码读取文件,做一些处理和并行写入结果:蟒蛇多队列错误

def line_chunker(path): 
    """ 
    Reads a file in chunks and yields each chunk. 
    Each chunk is guaranteed to end at a carriage return (EOL). 
    Each chunk is returned a single string. 

    The number of chunks the file is split into is equal to the number of CPU cores 
    available 
    """ 
    size = os.path.getsize(path) 
    cores = mp.cpu_count() 
    chunksize = size/cores # gives truncated integer 

    f = open(path) 
    s = f.readline() # skip header 
    while True: 
     part = f.readlines(chunksize) 
     if not part: 
      f.close() 
      break 
     else: 
      yield "".join(part) 
    f.close() 

def _validate(chunk, outq): 
    """ Performs format validation on a given chunk of a csv file """ 
    rows = csv.reader(StringIO(chunk)) 
    vld = validation.RowValidator(rows) 
    vld.check_rows() 
    outq.put(vld.errors) 

def _write(outq): 
    """Writes lines in the outq to a text file """ 
    outfile = open("C:/testoutput.txt", "w") 
    while True: 
     result = outq.get() 
     if result is None: 
      outfile.close() 
      break 
     else: 
      for line in result: 
       outfile.write(line) 
       outfile.write("\n") 

def validate_pll(path):  
    """ Perform validation in parallel """ 

    pool = mp.Pool() 
    outq = mp.Manager().Queue(maxsize = 8) 

    writer = mp.Process(target = _write, args = (outq,)) 
    writer.start() 
    for chunk in line_chunker(path): 
     pool.apply_async(_validate, (chunk, outq)) 

    pool.close() 
    pool.join() 

它读取数据块并为每个块的文件启动一个进程做处理。处理结果放在队列中,由另一个进程监视。

该代码运行,但完成后我得到一个奇怪的EOFError

我怀疑这是因为我不叫writer.join(),但如果我加入这一行,像这样:

def validate_pll(path):  
    """ Perform validation in parallel """ 

    pool = mp.Pool() 
    outq = mp.Manager().Queue(maxsize = 8) 

    writer = mp.Process(target = _write, args = (outq,)) 
    writer.start() 
    for chunk in line_chunker(path): 
     pool.apply_async(_validate, (chunk, outq)) 

    pool.close() 
    pool.join() 
    writer.join() 

的代码只是挂起。 任何想法我做错了什么?

给出的错误信息是:

Process Process-10: 
Traceback (most recent call last): 
    File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap 
     self.run() 
    File C\Anaconda\lib\multiprocessing\process.py line 114, in run 
     self._target(*self._args, **self._kwargs) 
    File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write 
     result = outq.get() 
    File "(string)", line 2, in get 
    File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod 
     kind, result = conn.recv() 
EOFError 

回答

1

_writer处理仍在等待条目当主处理结束写入outq。它通过打开阻止连接来管理共享QueueManager进程来等待条目。现在,在主进程完成其执行时,Manager进程关闭,将EOF发送到打开的连接_writer,您会看到该异常。

要解决该问题,您需要告知_writer在主进程结束之前关闭(并扩展Manager进程关闭)。你实际上已经有了一个机制,你只是没有使用它;发送一个Noneoutq,并且_writer将会按顺序关机。打电话writer.join()之前,事情应该正常工作。

+0

唉!当然。我想我认为如果队列中什么都没有,队列会自动给出None,但是当然如果你在一个空队列上调用get(),它只是等待返回一些东西。 我在''''''''''''''''''''''''''''''''''''''''''''''''''''之前添加了一个''outq.put(None)'',现在它工作正常。 – jramm 2014-09-23 12:53:20