我想解决一个涉及大量子问题的大数值问题,我使用Python的多处理模块(特别是Pool.map)将不同的独立子问题分解到不同的核心上。每个子问题涉及计算大量的子子问题,并且我试图通过将这些结果存储到文件中来有效地记忆这些结果,如果它们尚未被任何进程计算,则跳过计算并仅从文件中读取结果。Python多处理安全地写入文件
我对文件有并发问题:不同的进程有时会检查是否已经计算出子子问题(通过查找将存储结果的文件),查看它没有,运行计算,然后尝试同时将结果写入同一文件。我该如何避免写这样的碰撞?
我想解决一个涉及大量子问题的大数值问题,我使用Python的多处理模块(特别是Pool.map)将不同的独立子问题分解到不同的核心上。每个子问题涉及计算大量的子子问题,并且我试图通过将这些结果存储到文件中来有效地记忆这些结果,如果它们尚未被任何进程计算,则跳过计算并仅从文件中读取结果。Python多处理安全地写入文件
我对文件有并发问题:不同的进程有时会检查是否已经计算出子子问题(通过查找将存储结果的文件),查看它没有,运行计算,然后尝试同时将结果写入同一文件。我该如何避免写这样的碰撞?
@ GP89提到了一个很好的解决方案。使用队列将写入任务发送到具有唯一写入权限的专用进程。所有其他工作人员都只能读取访问权限。这将消除碰撞。下面是一个使用apply_async的例子,但它会与地图工作太:
import multiprocessing as mp
import time
fn = 'c:/temp/temp.txt'
def worker(arg, q):
'''stupidly simulates long running process'''
start = time.clock()
s = 'this is a test'
txt = s
for i in xrange(200000):
txt += s
done = time.clock() - start
with open(fn, 'rb') as f:
size = len(f.read())
res = 'Process' + str(arg), str(size), done
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
f = open(fn, 'wb')
while 1:
m = q.get()
if m == 'kill':
f.write('killed')
break
f.write(str(m) + '\n')
f.flush()
f.close()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for i in range(80):
job = pool.apply_async(worker, (i, q))
jobs.append(job)
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
if __name__ == "__main__":
main()
好运,
迈克
在我看来,你需要使用管理器来暂时保存结果到列表,然后将列表中的结果写入文件。另外,使用starmap传递要处理的对象和托管列表。第一步是构建要传递给包含托管列表的starmap的参数。
from multiprocessing import Manager
from multiprocessing import Pool
import pandas as pd```
def worker(row, param):
# do something here and then append it to row
x = param**2
row.append(x)
if __name__ == '__main__':
pool_parameter = [] # list of objects to process
with Manager() as mgr:
row = mgr.list([])
# build list of parameters to send to starmap
for param in pool_parameter:
params.append([row,param])
with Pool() as p:
p.starmap(worker, params)
从这一点开始,您需要决定如何处理列表。如果你有大量的RAM和庞大的数据集,可以随意使用熊猫进行连接。然后,您可以非常容易地将文件保存为csv或泡菜。
df = pd.concat(row, ignore_index=True)
df.to_pickle('data.pickle')
df.to_csv('data.csv')
退房使用的文档的例子['multiprocessing.Lock'](http://docs.python.org/2/library/multiprocessing.html#synchronization-between-processes)同步多个流程。 –
您可以只有一个进程写入结果,其中一个队列作为输入,可以由其他工作进程提供。我相信将所有工作进程设置为只读是安全的。 – GP89
我应该提到,为了使事情更加复杂,我在集群上同时运行多个不同的主要问题,每个主要问题都写在同一个网络文件系统上的子子问题上。因此,我可以完全从独立机器上运行的进程中获得冲突(所以我不认为使用诸如multiprocessing.Lock之类的解决方案会起作用)。 –