2015-11-20 71 views
0

我有一个函数,没有多处理循环与三元组数组并进行一些计算。这个数组可能真的很长(> 1百万条目),所以我认为使用多个进程可以帮助加快速度。死锁与多处理模块

我从一列点(random_points)开始,用它创建所有可能的三元组的排列(combList)。这combList然后传递给我的功能。 我有基本代码的作品,但只有当random_points列表有18个条目或更少。

from scipy import stats 
import itertools 
import multiprocessing as mp 

def calc3PointsList(points,output): 
    xy = [] 
    r = [] 
    for point in points: 
    // do stuff with points and append results to xy and r 
    output.put((xy, r)) 


output = mp.Queue() 

random_points = [ (np.array((stats.uniform(-0.5,1).rvs(),stats.uniform(-0.5,1).rvs()))) for _ in range(18)] 
combList = list(itertools.combinations(random_points, 3)) 
N = 6 
processes = [mp.Process(target=calc3PointsList, args=(combList[(i-1)*len(combList)/(N-1):i*len(combList)/(N-1)],output)) for i in range(1,N)] 

for p in processes: 
    p.start() 
for p in processes: 
    p.join() 
results = [output.get() for p in processes] 

只要random_points列表的长度超过18,程序就会进入死锁状态。 18和更低,它完成罚款。我是否以错误的方式使用整个多处理模块?

+0

您是否看到N = 4或N = 2的相同问题?对于你的最后一行,我认为它应该是'results = [output.get for x in range(output.qsize())]' – user3667217

+0

是的,仍然挂着N = 2或N = 4,这意味着1或2个作业我在(1,N)范围内创建了作业,所以N是排他性的。如果我尝试了N = 3或N = 5(因此有2或4个作业),如果你的意思是它与作业数量相关,它也会挂起。 感谢提示结果行 – Philipp

回答

0

OK,这个问题是由user2667217提到programming guideline描述:

记住,已经将商品放入队列中的进程将终止前,直到所有的缓冲项美联储等待

熊通过“进料器”螺纹到下面的管道。 (子进程可以调用队列的Queue.cancel_join_thread方法来避免这种行为。)

这意味着,无论何时使用队列,都需要确保放入队列的所有项最终都会在进程加入之前删除。否则,您无法确定将项目放入队列的进程将终止。还要记住,非守护进程会自动加入。

删除联接操作使其工作。此外,检索过程的正确方法似乎如下:

results = [output.get() for p in processes] 
1

我确实看到您发布的任何其他内容显然是错误的,但您肯定应该做一件事:在if __name__=="main":区块中启动新进程,请参见programming guideline

from scipy import stats 
import itertools 
import multiprocessing as mp 

def calc3PointsList(points,output): 
    xy = [] 
    r = [] 
    for point in points: 
    // do stuff with points and append results to xy and r 
    output.put((xy, r)) 

if __name__ == "__main__": 
    output = mp.Queue() 
    random_points = [ (np.array((stats.uniform(-0.5,1).rvs(),stats.uniform(-0.5,1).rvs()))) for _ in range(18)] 
    combList = list(itertools.combinations(random_points, 3)) 
    N = 6 
    processes = [mp.Process(target=calc3PointsList, args=(combList[(i-1)*len(combList)/(N-1):i*len(combList)/(N-1)],output)) for i in range(1,N)] 

    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 
    results = [output.get for x in range(output.qsize())] 
+0

我假设你的意思是'if name =='__ main __''。我加了这个谢谢,但不管如何,问题仍然存在。 – Philipp