0

我有一个元素列表,我正在处理多处理apply_async任务中的元素,并使用经理字典中的一个关键字逐个处理元素,我想在其中映射整个列表。修改多处理池的经理字典中的列表

我尝试下面的代码:

在最后干脆在字典职位空单。 输出:

{'task': {'processed_list': []}}

现在研究了一下后,我才知道该经理字典中的元素成为不可变的,所以你必须以更新它重新初始化新的数据字典整体。所以我试着下面的代码,它给出了一个奇怪的错误。

#!/usr/bin/python 

from multiprocessing import Pool, Manager 

def spammer_task(d, my_list): 
    #Initialize manager dict 
    d['task'] = { 
     'processed_list': [] 
    } 

    for ele in my_list: 
     #process here 
     old_list = d['task']['processed_list'] 
     new_list = old_list.append(ele) 
     #Have to do it this way since elements inside a manager dict become 
     #immutable so 
     d['task'] = { 
      'processed_list': new_list 
     } 

    return 

p = Pool() 
m = Manager() 
d = m.dict() 

my_list = ["one", "two", "three"] 

p.apply_async(spammer_task (d, my_list)) 
print d 

输出:

Traceback (most recent call last): File "./a.py", line 29, in p.apply_async(spammer_task (d, my_list)) File "./a.py", line 14, in spammer_task new_list = old_list.append(ele) AttributeError: 'NoneType' object has no attribute 'append'

不知怎的,这似乎是追加None,而我无法弄清楚,为什么名单。

+0

也许这只是一个愚蠢的问题,但对我来说,它看起来像你的例子会更好使用imap - 为什么你会使用apply_async? – janbrohl

+0

这只是一个示例程序,主要使用apply_async来处理它正在做的一些事情。此外,它调用它的多个进程 – MohitC

+0

更准确地说 - 我的意思是使用Pool.imap具有多个进程并在主进程中修改字典,因为这不应该在计算上花费太多。这对我来说似乎比制作大量副本和额外同步更明智 – janbrohl

回答

1

Accoridng到溶液中https://bugs.python.org/issue6766

下面的代码修复它,通过复制整个任务字典,然后修改它并重新复制它

#!/usr/bin/python 

from multiprocessing import Pool, Manager 

def spammer_task(d, my_list): 
    #Initialize manager dict 
    d['task'] = { 
     'processed_list': [] 
    } 

    for ele in my_list: 
     #process here 
     foo = d['task'] 
     foo['processed_list'].append(ele) 
     d['task'] = foo 
    return 

p = Pool() 
m = Manager() 
d = m.dict() 

my_list = ["one", "two", "three"] 

p.apply_async(spammer_task (d, my_list)) 
print d 

输出:从确保除了

{'task': {'processed_list': ['one', 'two', 'three']}}

0

d实际上包含的东西,当打印时,结果仍然是{'task': {'processed_list': ['one', 'two', 'three']}}

#!/usr/bin/python 

from multiprocessing import Pool 

def spammer_task(my_list): 
    #Initialize manager dict 
    out= { 
     'processed_list': [] 
    } 

    for ele in my_list: 
     #process here 
     out['processed_list'].append(ele) 

    return 'task',out 



my_list = ["one", "two", "three"] 

if __name__=="__main__": 

    p = Pool() 
    d=dict(p.imap_unordered(spammer_task, [my_list])) #this line blocks until finished 
    print d 
+0

我不希望执行被阻止。过程需要在后台运行。 – MohitC