2012-10-25 43 views
2

我正在读取数百个HDF文件并分别处理每个HDF的数据。但是,这需要花费很长时间,因为它一次只能处理一个HDF文件。我只是偶然发现了http://docs.python.org/library/multiprocessing.html,我现在想知道如何使用多处理来加快速度。用于更快处理数据的多进程Python/Numpy代码

到目前为止,我想出了这个:

import numpy as np 
from multiprocessing import Pool 

def myhdf(date): 
    ii  = dates.index(date) 
    year = date[0:4] 
    month = date[4:6] 
    day  = date[6:8] 
    rootdir = 'data/mydata/' 
    filename = 'no2track'+year+month+day 
    records = read_my_hdf(rootdir,filename) 
    if records.size: 
     results[ii] = np.mean(records) 

dates = ['20080105','20080106','20080107','20080108','20080109'] 
results = np.zeros(len(dates)) 

pool = Pool(len(dates)) 
pool.map(myhdf,dates) 

然而,这显然是不正确的。你可以按照我想要做的一连串的想法吗?我需要改变什么?

回答

2

泳池类map功能就像是标准的Python库map功能,保证能让你得到你的结果回来,你把他们的顺序。知道了,唯一的诀窍是,你需要返回结果以一致的方式,然后过滤它们。

import numpy as np 
from multiprocessing import Pool 

def myhdf(date): 
    year = date[0:4] 
    month = date[4:6] 
    day  = date[6:8] 
    rootdir = 'data/mydata/' 
    filename = 'no2track'+year+month+day 
    records = read_my_hdf(rootdir,filename) 
    if records.size: 
     return np.mean(records) 

dates = ['20080105','20080106','20080107','20080108','20080109'] 

pool = Pool(len(dates)) 
results = pool.map(myhdf,dates) 
results = [ result for result in results if result ] 
results = np.array(results) 

如果你真的这样做,尽快为他们提供想要的结果,你可以使用imap_unordered

+0

谢谢!这就说得通了。还有一个问题:使用Pool(number)我指定了并行进程的数量?所以如果我有很多HDF文件(> 1000),我应该保持池值在4-5左右? – HyperCube

+0

一个好的经验法则是每个物理核心的一个过程。 –

4

尝试joblib的友好multiprocessing包装:

from joblib import Parallel, delayed 

def myhdf(date): 
    # do work 
    return np.mean(records) 

results = Parallel(n_jobs=-1)(delayed(myhdf)(d) for d in dates) 
+0

其他很多实现与joblib和多处理(包括分布式作业执行)相似的功能的库是[ipython parallel](http://ipython.org/ipython-doc/dev/parallel/)和[celery](http:// celeryproject.org/) –

+0

@AndrewWalker:除了那些是分布式的,对吗? Joblib只是一个包含多处理和一些智能酸洗的封装。 –

+1

是,分布式和本地模式。但我认为值得注意的是,还有其他模块可以扩展。 –

相关问题