2015-05-23 141 views
1

我正在处理我的代码。代码花费很长时间才能在单个CPU上完成。
所以,我在想如果有可能让代码进行并行处理。
码帧像:并行处理python代码

def analyze_data(target_path): 
    import os 
    import math 
    import itertools 
    import numpy 
    import scipy 
    .... 
    for files in target_path: 
     <a real long series of calculations...... 
     ...................> 

    return 
#Providing the dir search path: 
dir_path = "/usr/target_dir/" 
analyze_data(target_path=dir_path) 

此代码正在这样的方式长到结束(文件数明显工艺是巨大的)。
现在有什么办法可以在多处理线程中执行这种简单的编码格式以使其运行得更快?

谢谢。

回答

0

见(用于python3)的文档:https://docs.python.org/3.4/library/multiprocessing.html

如果你可以分割你的目录来进行处理:

from multiprocessing import Pool 

def analyze_data(target_path): 
    import os 
    import math 
    import itertools 
    import numpy 
    import scipy 
    .... 
    for files in target_path: 
     <a real long series of calculations...... 
     ...................> 

    return 
#Providing the dir search path: 

analyze_data(target_path=dir_path) 

if __name__ == '__main__': 
    with Pool(5) as p: 
     dir_path1 = "/usr/target_dir/1" 
     dir_path2 = "/usr/target_dir/2" 
     dir_path3 = "/usr/target_dir/3" 
     print(p.map(analyze_data, [dir_path1, dir_path2, dir_path3])) 
+0

还是“M没有得到......”怎么每个文件(目录拥有这些文件的40/50),大约需要2小时才能完成。所以我正在寻找一种有效的方法来解决这个问题... – diffracteD

+0

'print(p.map(analyze_data,[dir_path1,dir_path2,dir_path3]))'这行是否将每个目录传递给单个线程?我可能需要将每个文件分发到多个线程以使其更快。 – diffracteD

+0

我很确定该行将命令分配给多进程,但体验python用户可能会确认这一点。 – maggick

0

使用的multiprocessing叉叫pathos.multiprocessing,这可真方便......和可以很自然地从翻译中完成。我还将利用pox,它具有一些除ossys模块之外的文件系统实用程序。首先查看我设置的测试文件。每个目录中有几个文件。

>>> import os 
>>> os.path.abspath('.') 
'/tmp' 
>>> import pox 
>>> # find all the .txt files in and below the current directory 
>>> pox.find('*.txt', '.') 
['/tmp/xxx/1.txt', 'tmp/xxx/2.txt', 'tmp/xxx/3.txt', 'tmp/yyy/1.txt', 'tmp/yyy/2.txt', 'tmp/zzz/1.txt', 'tmp/zzz/2.txt', 'tmp/zzz/3.txt', 'tmp/zzz/4.txt'] 
>>> # let's look at the contents of one of the files 
>>> print open('xxx/1.txt', 'r').read() 
45125123412 
12341234123 
12342134234 
23421342134 

所有的文件都有类似的内容......所以让我们开始并行处理文件。

>>> import time 
>>> import pathos 
>>> # build a thread pool of workers 
>>> thPool = pathos.multiprocessing.ThreadingPool 
>>> tp = thPool() 
>>> 
>>> # expensive per-file processing 
>>> def doit(file): 
...  with open(file, 'r') as f: 
...   x = sum(int(i) for i in f.readlines()) 
...  time.sleep(1) # make it 'expensive' 
...  return len(str(x))**2 # some calculation 
... 
>>> # grab all files from a directory, then do some final 'analysis' 
>>> def analyze_data(target_path): 
...  return min(*tp.uimap(doit, pox.find('*.txt', target_path))) 
... 
>>> analyze_data('.') 
121 

其实,analyze_data有些无关紧要,因为find并不需要在每个目录的基础工作......但是这是在问题中指定的结构。在这里,您将用昂贵的每文件任务替换大多数doit,并将用每个目录处理替换min。根据计算的花费多少,您可能需要使用pathos.multiprocessing.ProcessingPool而不是ThreadingPool - 前者将产生多个进程,而后者只产生多个线程。前者有更多的开销,但可以更好地并行处理更昂贵的任务。在这里,我们使用uimap来为每个文件调用doit提供一个无序迭代器。

获取pathospox这里:https://github.com/uqfoundation