2017-11-25 67 views
1

我原来的问题是关于Python下的并行性问题。然而,由于问题仍然没有答案,我删除了它,我试图总结我的结论。希望这会帮助别人......Python中的多处理比单线程慢

一般有使你的代码运行并行方式主要有两种 - 要么通过使用多线程多处理

根据许多职位上stackoverflow.com多线程库能够在线程之间有效地共享内存,但运行在单个核心的线程。因此,如果瓶颈是I/O操作,它可以加速您的代码。我不知道是否有对图书馆的许多现实生活中的应用...

如果你的代码是CPU密集型(有时也称为CPU为界),多处理库可以回答你的问题。该库将线程分散到各个核心。然而,很多人(包括我)都观察到,这样的多核代码可能会显着减慢它的单数对应。这个问题是由于个别线程无法有效共享内存这一事实造成的 - 数据被广泛复制,这造成了相当的开销。如下面的代码所示,开销非常依赖于输入数据类型。 Windows上的问题比Linux上的更为深刻。我不得不说,并行是我最大的Python失望 - 显然Python并非设计时考虑了并行...

第一段代码在使用Process的核心之间分配pandas dataframe

import numpy as np 
import math as mth 
import pandas as pd 
import time as tm 
import multiprocessing as mp 

def bnd_calc_npv_dummy(bnds_info, core_idx, npv): 
    """ multiple core dummy valuation function (based on single core function) """ 

    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 

    npv[core_idx] = np.array(bnds_info['npv']) 

def split_bnds_info(bnds_info, cores_no): 
    """ cut dataframe with bond definitions into pieces - one piece per core """ 

    bnds_info_mp = [] 
    bnds_no = len(bnds_info) 
    batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core 

    # split dataframe among cores 
    for idx in range(cores_no): 
     lower_bound = int(idx * batch_size) 
     upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no])) 
     bnds_info_mp.append(bnds_info[lower_bound : upper_bound].reset_index().copy()) 

    # return list of dataframes 
    return bnds_info_mp 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    manager = mp.Manager() 
    npv = manager.dict() 

    bnds_info_mp = split_bnds_info(bnds_info, cores_no) 

    processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]  
    [process.start() for process in processes]  
    [process.join() for process in processes] 

    # return NPV of individual bonds  
    return np.hstack(npv.values()) 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = {'currency_name' : 'EUR', 'npv' : 100} 
    bnds_info = pd.DataFrame(bnds_info, index = range(1)) 
    bnds_info = pd.concat([bnds_info] * bnds_no, ignore_index = True) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    npv = np.array(bnds_info['npv']) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

第二个代码是和以前一样的一个 - 唯一不同的是,这次我们使用的numpy array代替pandas dataframe和性能差异是巨大的(对比单核与运行时更改运行时间变化多核)。

import numpy as np 
import math as mth 
import time as tm 
import multiprocessing as mp 

def bnd_calc_npv_dummy(bnds_info, core_idx, npv): 
    """ multiple core dummy valuation function (based on single core function) """ 

    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 

    npv[core_idx] = bnds_info 

def split_bnds_info(bnds_info, cores_no): 
    """ cut dataframe with bond definitions into pieces - one piece per core """ 

    bnds_info_mp = [] 
    bnds_no = len(bnds_info) 
    batch_size = mth.ceil(np.float64(bnds_no)/cores_no) # number of bonds allocated to one core 

    # split dataframe among cores 
    for idx in range(cores_no): 
     lower_bound = int(idx * batch_size) 
     upper_bound = int(np.min([(idx + 1) * batch_size, bnds_no])) 
     bnds_info_mp.append(bnds_info[lower_bound : upper_bound]) 

    # return list of dataframes 
    return bnds_info_mp 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    manager = mp.Manager() 
    npv = manager.dict() 

    bnds_info_mp = split_bnds_info(bnds_info, cores_no) 

    processes = [mp.Process(target = bnd_calc_npv_dummy, args = (bnds_info_mp[core_idx], core_idx, npv)) for core_idx in xrange(cores_no)]  
    [process.start() for process in processes]  
    [process.join() for process in processes] 

    # return NPV of individual bonds  
    return np.hstack(npv.values()) 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = np.array([100] * bnds_no) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

代码的最后一块使用Pool代替Process。运行时间稍好一些。

import numpy as np 
import time as tm 
import multiprocessing as mp 

#import pdb 
#pdb.set_trace() 

def bnd_calc_npv_dummy(bnds_info): 
    """ multiple core dummy valuation function (based on single core function) """ 

    try: 
     # get number of bonds 
     bnds_no = len(bnds_info) 
    except: 
     pass 
     bnds_no = 1 

     tm.sleep(0.0001 * bnds_no) 

    return bnds_info 

def bnd_calc_npv(bnds_info, cores_no): 
    """ dummy valuation function running multicore """ 

    pool = mp.Pool(processes = cores_no) 
    npv = pool.map(bnd_calc_npv_dummy, bnds_info.tolist()) 

    # return NPV of individual bonds  
    return npv 

if __name__ == '__main__': 

    # create dummy dataframe 
    bnds_no = 1200 # number of dummy in the sample 
    bnds_info = np.array([100.0] * bnds_no) 

    # one core 
    print("ONE CORE") 
    start_time = tm.time() 
    bnds_no = len(bnds_info) 
    tm.sleep(0.0001 * bnds_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # two cores 
    print("TWO CORES") 
    cores_no = 2 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # three cores 
    print("THREE CORES") 
    cores_no = 3 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

    # four cores 
    print("FOUR CORES") 
    cores_no = 4 
    start_time = tm.time() 
    npv = bnd_calc_npv(bnds_info, cores_no) 
    elapsed_time = (tm.time() - start_time) 
    print(' elapsed time: ' + str(elapsed_time) + 's') 

所以,我的结论是Python的并行性实现并不适用于现实生活中(我使用Python 2.7.13和Window 7)。 最好的问候,

麦基

PS:如果有人能够改变我的代码将超过高兴地改变我的想法......

+4

根本不读它,多处理完成错误比单个进程慢。考虑创建一个[最小,完整和可验证示例](https://stackoverflow.com/help/mcve)。 –

+1

对于I/O绑定的任务(例如从文件读取或写入文件),您应该考虑使用'threading'模块而不是'multiprocessing'模块。多处理对CPU绑定任务更有效。 – ettanany

回答

1

多重效果最好时,一个问题的部分可以independantly计算,例如与multiprocessing.Pool。 池中的每个工作进程处理输入的一部分,并将结果返回给主进程。

如果所有进程都需要修改整个输入数组中的数据,那么很可能是来自manager的同步开销破坏了来自多处理的任何收益。