2015-07-22 57 views
1

我有一个交易者交易数据集df。 我有环路2级如下:Python:如何在python中运行嵌套并行进程?

smartTrader =[] 

for asset in range(len(Assets)): 
    df = df[df['Assets'] == asset] 
    # I have some more calculations here 
    for trader in range(len(df['TraderID'])): 
     # I have some calculations here, If trader is successful, I add his ID 
     # to the list as follows 
     smartTrader.append(df['TraderID'][trader]) 

    # some more calculations here which are related to the first for loop. 

我想parallelise每项资产的计算中Assets,我也想parallelise计算每个交易者的所有资产。所有这些计算完成后,我想根据smartTrader列表进行附加分析。

这是我第一次尝试并行处理,请耐心等待,谢谢你的帮助。

+2

试试['multiprocessing.Pool'(HTTPS:/ /docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers)。 – refi64

+0

我不确定如何在嵌套for循环时调用此函数,请问您可以提供一个小例子吗? – roland

回答

0

而不是使用for的,使用map

import functools 
smartTrader =[] 

m=map(calculations_as_a_function, 
     [df[df['Assets'] == asset] \ 
       for asset in range(len(Assets))]) 
functools.reduce(smartTradder.append, m) 

从此,你可以尝试不同的平行map实现股份公司multiprocessing'sstackless'

1

如果您使用pathos,它提供了multiprocessing的分支,您可以轻松地嵌套平行地图。 pathos用于轻松测试嵌套平行映射的组合 - 这是嵌套for循环的直接翻译。 它提供了一系列阻塞,非阻塞,迭代,异步,串行,并行和分布式映射。

>>> from pathos.pools import ProcessPool, ThreadPool 
>>> amap = ProcessPool().amap 
>>> tmap = ThreadPool().map 
>>> from math import sin, cos 
>>> print amap(tmap, [sin,cos], [range(10),range(10)]).get() 
[[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385, -0.27941549819892586, 0.6569865987187891, 0.9893582466233818, 0.4121184852417566], [1.0, 0.5403023058681398, -0.4161468365471424, -0.9899924966004454, -0.6536436208636119, 0.2836621854632263, 0.9601702866503661, 0.7539022543433046, -0.14550003380861354, -0.9111302618846769]] 

这里这个例子使用一个处理池和一个线程池,其中,所述螺纹地图呼叫阻塞,而处理图呼叫是异步的(注意get在最后一行的末尾)。

获取pathos这里:https://github.com/uqfoundation 或: $ pip install git+https://github.com/uqfoundation/[email protected]

0

大概线程,从标准Python库,是最方便的方法:

import threading 

def worker(id): 
    #Do you calculations here 
    return 

threads = [] 
for asset in range(len(Assets)): 
    df = df[df['Assets'] == asset] 
    for trader in range(len(df['TraderID'])): 
     t = threading.Thread(target=worker, args=(trader,)) 
     threads.append(t) 
     t.start() 
    #add semaphore here if you need synchronize results for all traders.