使用的multiprocessing
叉叫pathos.multiprocessing
,这可真方便......和可以很自然地从翻译中完成。我还将利用pox
,它具有一些除os
和sys
模块之外的文件系统实用程序。首先查看我设置的测试文件。每个目录中有几个文件。
>>> import os
>>> os.path.abspath('.')
'/tmp'
>>> import pox
>>> # find all the .txt files in and below the current directory
>>> pox.find('*.txt', '.')
['/tmp/xxx/1.txt', 'tmp/xxx/2.txt', 'tmp/xxx/3.txt', 'tmp/yyy/1.txt', 'tmp/yyy/2.txt', 'tmp/zzz/1.txt', 'tmp/zzz/2.txt', 'tmp/zzz/3.txt', 'tmp/zzz/4.txt']
>>> # let's look at the contents of one of the files
>>> print open('xxx/1.txt', 'r').read()
45125123412
12341234123
12342134234
23421342134
所有的文件都有类似的内容......所以让我们开始并行处理文件。
>>> import time
>>> import pathos
>>> # build a thread pool of workers
>>> thPool = pathos.multiprocessing.ThreadingPool
>>> tp = thPool()
>>>
>>> # expensive per-file processing
>>> def doit(file):
... with open(file, 'r') as f:
... x = sum(int(i) for i in f.readlines())
... time.sleep(1) # make it 'expensive'
... return len(str(x))**2 # some calculation
...
>>> # grab all files from a directory, then do some final 'analysis'
>>> def analyze_data(target_path):
... return min(*tp.uimap(doit, pox.find('*.txt', target_path)))
...
>>> analyze_data('.')
121
其实,analyze_data
有些无关紧要,因为find
并不需要在每个目录的基础工作......但是这是在问题中指定的结构。在这里,您将用昂贵的每文件任务替换大多数doit
,并将用每个目录处理替换min
。根据计算的花费多少,您可能需要使用pathos.multiprocessing.ProcessingPool
而不是ThreadingPool
- 前者将产生多个进程,而后者只产生多个线程。前者有更多的开销,但可以更好地并行处理更昂贵的任务。在这里,我们使用uimap
来为每个文件调用doit
提供一个无序迭代器。
获取pathos
和pox
这里:https://github.com/uqfoundation
还是“M没有得到......”怎么每个文件(目录拥有这些文件的40/50),大约需要2小时才能完成。所以我正在寻找一种有效的方法来解决这个问题... – diffracteD
'print(p.map(analyze_data,[dir_path1,dir_path2,dir_path3]))'这行是否将每个目录传递给单个线程?我可能需要将每个文件分发到多个线程以使其更快。 – diffracteD
我很确定该行将命令分配给多进程,但体验python用户可能会确认这一点。 – maggick