代码的清理版本包括the solution to the problem(感谢@JohanL!),可以发现as a Gist on GitHub。多处理池:如何调用类的列表上的方法任意对象列表
剪断下面的代码(CPython的3 [4,5,6])说明我的意图(以及我的问题):
from functools import partial
import multiprocessing
from pprint import pprint as pp
NUM_CORES = multiprocessing.cpu_count()
class some_class:
some_dict = {'some_key': None, 'some_other_key': None}
def some_routine(self):
self.some_dict.update({'some_key': 'some_value'})
def some_other_routine(self):
self.some_dict.update({'some_other_key': 77})
def run_routines_on_objects_in_parallel_and_return(in_object_list, routine_list):
func_handle = partial(__run_routines_on_object_and_return__, routine_list)
with multiprocessing.Pool(processes = NUM_CORES) as p:
out_object_list = list(p.imap_unordered(
func_handle,
(in_object for in_object in in_object_list)
))
return out_object_list
def __run_routines_on_object_and_return__(routine_list, in_object):
for routine_name in routine_list:
getattr(in_object, routine_name)()
return in_object
object_list = [some_class() for item in range(20)]
pp([item.some_dict for item in object_list])
new_object_list = run_routines_on_objects_in_parallel_and_return(
object_list,
['some_routine', 'some_other_routine']
)
pp([item.some_dict for item in new_object_list])
verification_object_list = [
__run_routines_on_object_and_return__(
['some_routine', 'some_other_routine'],
item
) for item in object_list
]
pp([item.some_dict for item in verification_object_list])
我用的对象列表工作键入some_class
。 some_class
有一个属性,一个字典,名为some_dict
和一些方法,它可以修改字典(some_routine
和some_other_routine
)。有时候,我想调用列表中所有对象的一系列方法。由于这是计算密集型的,我打算将对象分布在多个CPU核心上(使用multiprocessing.Pool
和imap_unordered
- 列表顺序无关紧要)。
例程__run_routines_on_object_and_return__
负责调用一个单独对象上的方法列表。从我所知道的情况来看,这工作得很好。我使用functools.partial
来简化代码结构 - 因此,多处理池必须仅将对象列表作为输入参数来处理。
问题是......它不起作用。由imap_unordered
返回的列表中包含的对象与我输入的对象相同。对象内的字典与以前一样。我已经使用类似的机制来直接处理字典列表而不会产生小故障,所以我怀疑修改碰巧是字典的对象属性有问题。
在我的示例中,verification_object_list
包含正确的结果(尽管它是在单个进程/线程中生成的)。 new_object_list
与object_list
完全相同,但情况并非如此。
我在做什么错?
编辑
我发现下面的question,具有实际工作和适用answer。我修改了一点之后我打电话的每个对象的方法列表的想法,它的工作原理:
import random
from multiprocessing import Pool, Manager
class Tester(object):
def __init__(self, num=0.0, name='none'):
self.num = num
self.name = name
def modify_me(self):
self.num += random.normalvariate(mu=0, sigma=1)
self.name = 'pla' + str(int(self.num * 100))
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
def init(L):
global tests
tests = L
def modify(i_t_nn):
i, t, nn = i_t_nn
for method_name in nn:
getattr(t, method_name)()
tests[i] = t # copy back
return i
def main():
num_processes = num = 10 #note: num_processes and num may differ
manager = Manager()
tests = manager.list([Tester(num=i) for i in range(num)])
print(tests[:2])
args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
for i in pool.imap_unordered(modify, args):
print("done %d" % i)
pool.close()
pool.join()
print(tests[:2])
if __name__ == '__main__':
main()
现在,我去一点,并介绍了我原来的some_class
进入游戏,其中包含了所描述的字典物业some_dict
。这是行不通的:
import random
from multiprocessing import Pool, Manager
from pprint import pformat as pf
class some_class:
some_dict = {'some_key': None, 'some_other_key': None}
def some_routine(self):
self.some_dict.update({'some_key': 'some_value'})
def some_other_routine(self):
self.some_dict.update({'some_other_key': 77})
def __repr__(self):
return pf(self.some_dict)
def init(L):
global tests
tests = L
def modify(i_t_nn):
i, t, nn = i_t_nn
for method_name in nn:
getattr(t, method_name)()
tests[i] = t # copy back
return i
def main():
num_processes = num = 10 #note: num_processes and num may differ
manager = Manager()
tests = manager.list([some_class() for i in range(num)])
print(tests[:2])
args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))
pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
for i in pool.imap_unordered(modify, args):
print("done %d" % i)
pool.close()
pool.join()
print(tests[:2])
if __name__ == '__main__':
main()
的DIFF工作和不工作真的很小之间,但我还是不明白这一点:
diff --git a/test.py b/test.py
index b12eb56..0aa6def 100644
--- a/test.py
+++ b/test.py
@@ -1,15 +1,15 @@
import random
from multiprocessing import Pool, Manager
+from pprint import pformat as pf
-class Tester(object):
- def __init__(self, num=0.0, name='none'):
- self.num = num
- self.name = name
- def modify_me(self):
- self.num += random.normalvariate(mu=0, sigma=1)
- self.name = 'pla' + str(int(self.num * 100))
+class some_class:
+ some_dict = {'some_key': None, 'some_other_key': None}
+ def some_routine(self):
+ self.some_dict.update({'some_key': 'some_value'})
+ def some_other_routine(self):
+ self.some_dict.update({'some_other_key': 77})
def __repr__(self):
- return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
+ return pf(self.some_dict)
def init(L):
global tests
@@ -25,10 +25,10 @@ def modify(i_t_nn):
def main():
num_processes = num = 10 #note: num_processes and num may differ
manager = Manager()
- tests = manager.list([Tester(num=i) for i in range(num)])
+ tests = manager.list([some_class() for i in range(num)])
print(tests[:2])
- args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
+ args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))
这到底是怎么回事?
多么愚蠢的错误......非常感谢你向我解释这一点。 –