2016-06-21 168 views
1

我想在Python中使用多处理来加速while循环。循环条件并行化/多处理

更具体地说:
我有一个矩阵(样本*功能)。我想选择x样本的子集,其特征的随机子集的值不等于某个值(本例中为-1)。

我的串行代码:

np.random.seed(43) 
datafile = '...' 
df = pd.read_csv(datafile, sep=" ", nrows = 89) 

no_feat = 500 
no_samp = 5 
no_trees = 5 
i=0 
iter=0 


samples = np.zeros((no_trees, no_samp)) 
features = np.zeros((no_trees, no_feat)) 

while i < no_trees: 
    rand_feat = np.random.choice(df.shape[1], no_feat, replace=False) 
    iter_order = np.random.choice(df.shape[0], df.shape[0], replace=False) 

    samp_idx = [] 
    a=0 

#-------------- 
    #how to run in parallel? 

    for j in iter_order: 
     pot_samp = df.iloc[j, rand_feat] 
     if len(np.where(pot_samp==-1)[0]) == 0: 
      samp_idx.append(j) 
     if len(samp_idx) == no_samp: 
      print a 
      break 
     a+=1 

#-------------- 

    if len(samp_idx) == no_samp: 
     samples[i,:] = samp_idx 
     features[i, :] = rand_feat 
     i+=1 
    iter+=1 
    if iter>1000: #break if subsets cannot be found 
     break 

搜索拟合样品是潜在地昂贵的部分(第j for循环),这在理论上可以并行运行。在某些情况下,不需要遍历所有样本以找到足够大的子集,这就是为什么一旦子集足够大,我就会跳出循环。
我很努力地找到一个实现,可以检查已经生成了多少有效结果。它甚至有可能吗?

我以前用过joblib。如果我理解正确,这会使用多处理方法作为仅适用于单独任务的后端?我在想,queues可能会有所帮助,但迄今为止我未能实施它们。

+0

使用'joblib'或'multiprocessing.pool'是有道理的。我会为每个核心运行一个进程,并创建一个共享计数器,由'Lock'保护或者实现为一个原子整数,将其递增直至达到特定计数(考虑到重复),然后所有进程都将完成,返回他们的结果。 (你可以使用'apply_async()')。 – advance512

+1

@ advance512谢谢你给我这些方法来看看。 – Dahlai

回答

0

我找到了一个工作解决方案。我决定并行运行while循环,并让不同的进程通过共享计数器进行交互。此外,我矢量化搜索合适的样本。

矢量化产生了〜300倍的加速,并在4个内核上运行,可以使计算速度提高两倍。

首先,我尝试实施单独的流程并将结果放入queue。原来,这些不是用来存储大量数据的。

如果有人看到另一个代码瓶颈,我会很高兴,如果有人指出。

由于我对平行计算基本不存在的知识,我发现很难将它们拼凑在一起,特别是因为互联网上的例子都非常基本。我学到了很多,虽然=)

我的代码:

import numpy as np 
import pandas as pd 
import itertools 
from multiprocessing import Pool, Lock, Value 
from datetime import datetime 
import settings 


val = Value('i', 0) 
worker_ID = Value('i', 1) 
lock = Lock() 

def findSamp(no_trees, df, no_feat, no_samp): 
    lock.acquire() 
    print 'starting worker - {0}'.format(worker_ID.value) 
    worker_ID.value +=1 
    worker_ID_local = worker_ID.value 
    lock.release() 

    max_iter = 100000 
    samp = [] 
    feat = [] 
    iter_outer = 0 
    iter = 0 
    while val.value < no_trees and iter_outer<max_iter: 
     rand_feat = np.random.choice(df.shape[1], no_feat, replace=False 

     #get samples with random features from dataset; 
     #find and select samples that don't have missing values in the random features 
     samp_rand = df.iloc[:,rand_feat] 
     nan_idx = np.unique(np.where(samp_rand == -1)[0]) 
     all_idx = np.arange(df.shape[0]) 
     notnan_bool = np.invert(np.in1d(all_idx, nan_idx)) 
     notnan_idx = np.where(notnan_bool == True)[0] 

     if notnan_idx.shape[0] >= no_samp: 
      #if enough samples for random feature subset, select no_samp samples randomly 
      notnan_idx_rand = np.random.choice(notnan_idx, no_samp, replace=False) 
      rand_feat_rand = rand_feat 

      lock.acquire() 
      val.value += 1 
      #x = val.value 
      lock.release() 
      #print 'no of trees generated: {0}'.format(x) 
      samp.append(notnan_idx_rand) 
      feat.append(rand_feat_rand) 

     else: 
      #increase iter_outer counter if no sample subset could be found for random feature subset 
      iter_outer += 1 

     iter+=1 
    if iter >= max_iter: 
     print 'exiting worker{0} because iter >= max_iter'.format(worker_ID_local) 
    else: 
     print 'worker{0} - finished'.format(worker_ID_local) 
    return samp, feat 

def initialize(*args): 
    global val, worker_ID, lock 
    val, worker_ID, lock = args 

def star_findSamp(i_df_no_feat_no_samp): 
    return findSamp(*i_df_no_feat_no_samp) 


if __name__ == '__main__': 
    np.random.seed(43) 
    datafile = '...' 
    df = pd.read_csv(datafile, sep=" ", nrows = 89) 
    df = df.fillna(-1) 
    df = df.iloc[:, 6:] 

    no_feat = 700 
    no_samp = 10 
    no_trees = 5000 


    startTime = datetime.now() 
    print 'starting multiprocessing' 
    ncores = 4 
    p = Pool(ncores, initializer=initialize, initargs=(val, worker_ID, lock)) 
    args = itertools.izip([no_trees]*ncores, itertools.repeat(df), itertools.repeat(no_feat), itertools.repeat(no_samp)) 

    result = p.map(star_findSamp, args)#, callback=log_result) 
    p.close() 
    p.join() 

    print '{0} sample subsets for tree training have been found'.format(val.value) 

    samples = [x[0] for x in result if x != None] 
    samples = np.vstack(samples) 
    features = [x[1] for x in result if x != None] 
    features = np.vstack(features) 
    print datetime.now() - startTime