2012-11-19 29 views
32

我想解决一个涉及大量子问题的大数值问题,我使用Python的多处理模块(特别是Pool.map)将不同的独立子问题分解到不同的核心上。每个子问题涉及计算大量的子子问题,并且我试图通过将这些结果存储到文件中来有效地记忆这些结果,如果它们尚未被任何进程计算,则跳过计算并仅从文件中读取结果。Python多处理安全地写入文件

我对文件有并发问题:不同的进程有时会检查是否已经计算出子子问题(通过查找将存储结果的文件),查看它没有,运行计算,然后尝试同时将结果写入同一文件。我该如何避免写这样的碰撞?

+3

退房使用的文档的例子['multiprocessing.Lock'](http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes)同步多个流程。 –

+11

您可以只有一个进程写入结果,其中一个队列作为输入,可以由其他工作进程提供。我相信将所有工作进程设置为只读是安全的。 – GP89

+0

我应该提到,为了使事情更加复杂,我在集群上同时运行多个不同的主要问题,每个主要问题都写在同一个网络文件系统上的子子问题上。因此,我可以完全从独立机器上运行的进程中获得冲突(所以我不认为使用诸如multiprocessing.Lock之类的解决方案会起作用)。 –

回答

63

@ GP89提到了一个很好的解决方案。使用队列将写入任务发送到具有唯一写入权限的专用进程。所有其他工作人员都只能读取访问权限。这将消除碰撞。下面是一个使用apply_async的例子,但它会与地图工作太:

import multiprocessing as mp 
import time 

fn = 'c:/temp/temp.txt' 

def worker(arg, q): 
    '''stupidly simulates long running process''' 
    start = time.clock() 
    s = 'this is a test' 
    txt = s 
    for i in xrange(200000): 
     txt += s 
    done = time.clock() - start 
    with open(fn, 'rb') as f: 
     size = len(f.read()) 
    res = 'Process' + str(arg), str(size), done 
    q.put(res) 
    return res 

def listener(q): 
    '''listens for messages on the q, writes to file. ''' 

    f = open(fn, 'wb') 
    while 1: 
     m = q.get() 
     if m == 'kill': 
      f.write('killed') 
      break 
     f.write(str(m) + '\n') 
     f.flush() 
    f.close() 

def main(): 
    #must use Manager queue here, or will not work 
    manager = mp.Manager() 
    q = manager.Queue()  
    pool = mp.Pool(mp.cpu_count() + 2) 

    #put listener to work first 
    watcher = pool.apply_async(listener, (q,)) 

    #fire off workers 
    jobs = [] 
    for i in range(80): 
     job = pool.apply_async(worker, (i, q)) 
     jobs.append(job) 

    # collect results from the workers through the pool result queue 
    for job in jobs: 
     job.get() 

    #now we are done, kill the listener 
    q.put('kill') 
    pool.close() 

if __name__ == "__main__": 
    main() 

好运,

迈克

+1

嗨迈克,谢谢你的回答。我认为这可以解决这个问题,但我不太确定它是否能够解决问题的评论中概括的完整问题,特别是我如何在联网的多台机器上运行几个主要程序文件系统,所有这些文件系统都有可能会尝试写入同一文件的进程。 (FWIW,前段时间我用一种冒险的方式解决了我的个人问题,但在评论其他人遇到类似问题时发表评论。) –

+1

我真的很想多次赞扬它。这对我来说很有帮助。今天再一次。 – Eduardo

+0

谢谢迈克 - 我一直在努力如何使用MP队列。你的例子使它非常清楚和直接。 – Anurag

0

在我看来,你需要使用管理器来暂时保存结果到列表,然后将列表中的结果写入文件。另外,使用starmap传递要处理的对象和托管列表。第一步是构建要传递给包含托管列表的starmap的参数。

from multiprocessing import Manager 
from multiprocessing import Pool 
import pandas as pd``` 

def worker(row, param): 
    # do something here and then append it to row 
    x = param**2 
    row.append(x) 

if __name__ == '__main__': 
    pool_parameter = [] # list of objects to process 
    with Manager() as mgr: 
     row = mgr.list([]) 

     # build list of parameters to send to starmap 
     for param in pool_parameter: 
      params.append([row,param]) 

     with Pool() as p: 
      p.starmap(worker, params) 

从这一点开始,您需要决定如何处理列表。如果你有大量的RAM和庞大的数据集,可以随意使用熊猫进行连接。然后,您可以非常容易地将文件保存为csv或泡菜。

 df = pd.concat(row, ignore_index=True) 

     df.to_pickle('data.pickle') 
     df.to_csv('data.csv')