4

代码的清理版本包括the solution to the problem(感谢@JohanL!),可以发现as a Gist on GitHub多处理池:如何调用类的列表上的方法任意对象列表


剪断下面的代码(CPython的3 [4,5,6])说明我的意图(以及我的问题):

from functools import partial 
import multiprocessing 
from pprint import pprint as pp 

NUM_CORES = multiprocessing.cpu_count() 

class some_class: 
    some_dict = {'some_key': None, 'some_other_key': None} 
    def some_routine(self): 
     self.some_dict.update({'some_key': 'some_value'}) 
    def some_other_routine(self): 
     self.some_dict.update({'some_other_key': 77}) 

def run_routines_on_objects_in_parallel_and_return(in_object_list, routine_list): 
    func_handle = partial(__run_routines_on_object_and_return__, routine_list) 
    with multiprocessing.Pool(processes = NUM_CORES) as p: 
     out_object_list = list(p.imap_unordered(
      func_handle, 
      (in_object for in_object in in_object_list) 
      )) 
    return out_object_list 

def __run_routines_on_object_and_return__(routine_list, in_object): 
    for routine_name in routine_list: 
     getattr(in_object, routine_name)() 
    return in_object 

object_list = [some_class() for item in range(20)] 
pp([item.some_dict for item in object_list]) 

new_object_list = run_routines_on_objects_in_parallel_and_return(
     object_list, 
     ['some_routine', 'some_other_routine'] 
     ) 
pp([item.some_dict for item in new_object_list]) 

verification_object_list = [ 
    __run_routines_on_object_and_return__(
     ['some_routine', 'some_other_routine'], 
     item 
     ) for item in object_list 
    ] 
pp([item.some_dict for item in verification_object_list]) 

我用的对象列表工作键入some_classsome_class有一个属性,一个字典,名为some_dict和一些方法,它可以修改字典(some_routinesome_other_routine)。有时候,我想调用列表中所有对象的一系列方法。由于这是计算密集型的,我打算将对象分布在多个CPU核心上(使用multiprocessing.Poolimap_unordered - 列表顺序无关紧要)。

例程__run_routines_on_object_and_return__负责调用一个单独对象上的方法列表。从我所知道的情况来看,这工作得很好。我使用functools.partial来简化代码结构 - 因此,多处理池必须仅将对象列表作为输入参数来处理。

问题是......它不起作用。由imap_unordered返回的列表中包含的对象与我输入的对象相同。对象内的字典与以前一样。我已经使用类似的机制来直接处理字典列表而不会产生小故障,所以我怀疑修改碰巧是字典的对象属性有问题。

在我的示例中,verification_object_list包含正确的结果(尽管它是在单个进程/线程中生成的)。 new_object_listobject_list完全相同,但情况并非如此。

我在做什么错?


编辑

我发现下面的question,具有实际工作和适用answer。我修改了一点之后我打电话的每个对象的方法列表的想法,它的工作原理:

import random 
from multiprocessing import Pool, Manager 

class Tester(object): 
    def __init__(self, num=0.0, name='none'): 
     self.num = num 
     self.name = name 
    def modify_me(self): 
     self.num += random.normalvariate(mu=0, sigma=1) 
     self.name = 'pla' + str(int(self.num * 100)) 
    def __repr__(self): 
     return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name) 

def init(L): 
    global tests 
    tests = L 

def modify(i_t_nn): 
    i, t, nn = i_t_nn 
    for method_name in nn: 
     getattr(t, method_name)() 
    tests[i] = t # copy back 
    return i 

def main(): 
    num_processes = num = 10 #note: num_processes and num may differ 
    manager = Manager() 
    tests = manager.list([Tester(num=i) for i in range(num)]) 
    print(tests[:2]) 

    args = ((i, t, ['modify_me']) for i, t in enumerate(tests)) 
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,)) 
    for i in pool.imap_unordered(modify, args): 
     print("done %d" % i) 
    pool.close() 
    pool.join() 
    print(tests[:2]) 

if __name__ == '__main__': 
    main() 

现在,我去一点,并介绍了我原来的some_class进入游戏,其中包含了所描述的字典物业some_dict。这是行不通的:

import random 
from multiprocessing import Pool, Manager 
from pprint import pformat as pf 

class some_class: 
    some_dict = {'some_key': None, 'some_other_key': None} 
    def some_routine(self): 
     self.some_dict.update({'some_key': 'some_value'}) 
    def some_other_routine(self): 
     self.some_dict.update({'some_other_key': 77}) 
    def __repr__(self): 
     return pf(self.some_dict) 

def init(L): 
    global tests 
    tests = L 

def modify(i_t_nn): 
    i, t, nn = i_t_nn 
    for method_name in nn: 
     getattr(t, method_name)() 
    tests[i] = t # copy back 
    return i 

def main(): 
    num_processes = num = 10 #note: num_processes and num may differ 
    manager = Manager() 
    tests = manager.list([some_class() for i in range(num)]) 
    print(tests[:2]) 

    args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests)) 
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,)) 
    for i in pool.imap_unordered(modify, args): 
     print("done %d" % i) 
    pool.close() 
    pool.join() 
    print(tests[:2]) 

if __name__ == '__main__': 
    main() 

的DIFF工作和不工作真的很小之间,但我还是不明白这一点:

diff --git a/test.py b/test.py 
index b12eb56..0aa6def 100644 
--- a/test.py 
+++ b/test.py 
@@ -1,15 +1,15 @@ 
import random 
from multiprocessing import Pool, Manager 
+from pprint import pformat as pf 

-class Tester(object): 
-  def __init__(self, num=0.0, name='none'): 
-    self.num = num 
-    self.name = name 
-  def modify_me(self): 
-    self.num += random.normalvariate(mu=0, sigma=1) 
-    self.name = 'pla' + str(int(self.num * 100)) 
+class some_class: 
+  some_dict = {'some_key': None, 'some_other_key': None} 
+  def some_routine(self): 
+    self.some_dict.update({'some_key': 'some_value'}) 
+  def some_other_routine(self): 
+    self.some_dict.update({'some_other_key': 77}) 
     def __repr__(self): 
-    return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name) 
+    return pf(self.some_dict) 

def init(L): 
     global tests 
@@ -25,10 +25,10 @@ def modify(i_t_nn): 
def main(): 
     num_processes = num = 10 #note: num_processes and num may differ 
     manager = Manager() 
-  tests = manager.list([Tester(num=i) for i in range(num)]) 
+  tests = manager.list([some_class() for i in range(num)]) 
     print(tests[:2]) 

-  args = ((i, t, ['modify_me']) for i, t in enumerate(tests)) 
+  args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests)) 

这到底是怎么回事?

回答

3

你的问题是由于两件事情;即你正在使用一个类变量,并且你正在不同的进程中运行你的代码。

由于不同的进程不共享内存,因此必须对所有对象和参数进行清理并从原始进程发送到执行该进程的进程。当参数是一个对象时,它的类是而不是与它一起发送。接收过程使用自己的蓝图(即class)。

在您当前的代码中,您将对象作为参数传递,更新并返回它。但是,更新不是针对对象进行的,而是针对类本身进行的,因为您正在更新类变量。但是,此更新不会发回您的主流程,因此您将留下未更新的课程。

想要要做的,就是让some_dict成为你的对象的一部分,而不是你的类。这很容易通过__init__()方法完成。因此,修改some_class为:

class some_class: 
    def __init__(self): 
     self.some_dict = {'some_key': None, 'some_other_key': None} 
    def some_routine(self): 
     self.some_dict.update({'some_key': 'some_value'}) 
    def some_other_routine(self): 
     self.some_dict.update({'some_other_key': 77}) 

这会让你的工作方案,你想让它。您几乎总是希望在调用__init__()时设置对象,而不是类变量,因为在后一种情况下,数据将在所有实例之间共享(并且可以由所有实例进行更新)。当你将数据和状态封装到一个类的对象中时,这通常不是你想要的。

编辑:看来我错了class是否与腌制物件一起发送。在进一步检查后会发生什么,我认为class本身及其类变量都被腌渍了。因为如果在将对象发送到新进程之前更新类变量,则更新的值可用。 但是它仍然是在新过程中完成的更新没有中继回到原始class的情况。

+0

多么愚蠢的错误......非常感谢你向我解释这一点。 –

相关问题