我有问题并行分配我的功能。Python多处理 - 棘手的用例,包括传递参数
问题描述:我有2个坐标对列表,dfC
和dfO
。对于dfC
中的每个obs,我正在统计有多少dfO
的半径为r
。 我目前有一个工作功能,但我想看看我是否可以并行处理。
问题是这样的:dfC
可以拆分和单独处理...但dfO
需要100%的每个工人。我的方法是,让我先把它并行工作 - 然后我会担心如何向工人分发dfO
的完整副本。除非有人能帮我解决这两个问题?
首先,这里设置的一切行动代码:
import pandas as pd
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, process
import traceback
from scipy.spatial import cKDTree
# create 2 dataframes with random "coordinates"
dfC=pd.DataFrame(np.random.np.random.randint(0,100,size=(50,2)), columns=list('xy'))
dfO=pd.DataFrame(np.random.np.random.randint(0,100,size=(500,2)), columns=list('jk'))
这里是什么dfC
样子,dfO
将类似于
+----+----+
| x | y |
+----+----+
| 35 | 5 |
+----+----+
| 96 | 18 |
+----+----+
| 23 | 25 |
+----+----+
| 20 | 7 |
+----+----+
| 74 | 54 |
+----+----+
下一个例子,这里是工作的功能等魅力。我不是单独传递所有参数,而是实际上是这样做的 - 准备一个主函数来并行地调用这些参数(并且我无法找到一种多处理方法来完成这项工作)。
# this function works on dfC, and adds a row which counts the number
# of objects in dfO which are within radius r
def worker_job(args):
try:
dfC, dfO, newcol, r = args
mxC=dfC.as_matrix()
mxO = dfO.as_matrix()
# magic tree stuff
C_Tree = cKDTree(mxC)
O_Tree = cKDTree(mxO)
listoflists = C_Tree.query_ball_tree(O_Tree, r, p=2.0, eps=0.0)
counts=[]
for i in listoflists:
counts.append(len(i))
s = pd.Series(counts)
dfC[newcol] = s.values
except:
raise
traceback.print_exc()
else:
return dfC
如果我创造我的论点是这样的: args=[dfC,dfO,"new_column_name",3]
它完美,当我通过自身运行它: worker_job(args)
+----+----+-----------------+
| x | y | new_column_name |
+----+----+-----------------+
| 35 | 5 | 4 |
+----+----+-----------------+
| 96 | 18 | 1 |
+----+----+-----------------+
| 23 | 25 | 0 |
+----+----+-----------------+
| 20 | 7 | 1 |
+----+----+-----------------+
| 74 | 54 | 2 |
+----+----+-----------------+
现在,我尝试建立函数,将控制并行工作者并行运行这个东西。这是我的最大努力:
# this function should control the multiprocessing
def Run_Parallel(Function, Num_Proc, args):
try:
pool = Pool(Num_Proc)
parts = pool.map(Function,args)
pool.close()
pool.join()
results_df = pd.concat(parts)
except:
pool.close()
pool.terminate()
traceback.print_exc()
else:
return results_df
它不会工作。 Run_Parallel(worker_job,2,args)
会抛出一个关于ValueError: not enough values to unpack (expected 4, got 2)
的错误。当它通过包装器时,必须发生一些参数列表。
我正在寻找这个错误的指导,特别是,谁知道如何解决更大的问题 - 这是我需要我的池包含100%的dfO
和只是dfC
子集的效率。
'Pool.map'预计的'iterable'。所以你必须把你的'args'列表放在另一个列表中,然后传递给'map'函数。您可能会注意到它与直接调用'worker_job'函数没有区别。所以你不得不重构你的程序。 – Himal