2011-08-07 136 views
43

是否可以创建一个非守护程序的python Pool?我想要一个池可以调用内部有另一个池的函数。谢谢。Python进程池非守护进程?

+0

据我所知,没有它不可能在池中的所有工人都进程化,它是不可能__inject依赖关系___,顺便说一句,我不明白你的问题的第二部分“我希望一个池能够调用一个内部具有另一个池的函数”以及这是如何干扰工作者被守护进程的事实。 – mouad

+1

因为如果函数a具有一个运行函数b的池,该函数具有运行函数c的池,则存在b问题,即它在守护进程中运行,并且守护进程无法创建进程。 'AssertionError:守护进程不允许有孩子' – Max

回答

68

multiprocessing.pool.Pool类在其__init__方法创建工作进程,使他们邪和启动它们,并在开始之前(事后这是不允许的了)这是不可能给他们daemon属性重新设置为False。但是,您可以创建自己的子类multiprocesing.pool.Poolmultiprocessing.Pool只是一个包装函数),并将替代自己的multiprocessing.Process子类(它始终是非守护进程的)用于工作进程。

下面是如何做到这一点的完整示例。最重要的部分是顶部的两个类NoDaemonProcessMyPool,最后在MyPool实例上调用pool.close()pool.join()

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 

import multiprocessing 
# We must import this explicitly, it is not imported by the top-level 
# multiprocessing module. 
import multiprocessing.pool 
import time 

from random import randint 


class NoDaemonProcess(multiprocessing.Process): 
    # make 'daemon' attribute always return False 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool 
# because the latter is only a wrapper function, not a proper class. 
class MyPool(multiprocessing.pool.Pool): 
    Process = NoDaemonProcess 

def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t 

def work(num_procs): 
    print("Creating %i (daemon) workers and jobs in child." % num_procs) 
    pool = multiprocessing.Pool(num_procs) 

    result = pool.map(sleepawhile, 
     [randint(1, 5) for x in range(num_procs)]) 

    # The following is not really needed, since the (daemon) workers of the 
    # child's pool are killed when the child is terminated, but it's good 
    # practice to cleanup after ourselves anyway. 
    pool.close() 
    pool.join() 
    return result 

def test(): 
    print("Creating 5 (non-daemon) workers and jobs in main process.") 
    pool = MyPool(5) 

    result = pool.map(work, [randint(1, 5) for x in range(5)]) 

    pool.close() 
    pool.join() 
    print(result) 

if __name__ == '__main__': 
    test() 
+0

上面的代码似乎对我而言是悬而未决的。具体来说,它似乎挂在工作()内的pool.close()。有什么我失踪? – 2012-09-24 17:39:30

+1

我刚刚在Linux和Python 2.6/2.7/3.2 OS X上用Python 2.7/3.2(修复“打印”行后)再次测试我的代码。OS X上的Linux和Python 2.7/3.2工作正常,但代码确实挂起在OS X上使用Python 2.6(Lion)。这似乎是多处理模块中的一个错误,它已得到修复,但我没有真正检查错误跟踪器。 –

+0

这应该在多处理模块中真正解决(应该提供非守护进程工作者的选项)。有谁知道谁维护它? –

6

multiprocessing模块有一个很好的接口以使用与进程线程池。根据您当前的使用情况,您可能会考虑对外部池使用multiprocessing.pool.ThreadPool,这将导致线程(允许从内部产生进程)而不是进程。

它可能由GIL的限制,而在我的具体情况从外Pool(I测试的两种),启动时间的流程作为创建here远远超过与ThreadPool该溶液中。


这真的很容易掉ProcessesThreads。详细了解如何使用ThreadPool解决方案herehere

0

我遇到的问题是在尝试导入模块之间的全局变量时,导致ProcessPool()行被多次评估。

globals.py

from processing    import Manager, Lock 
from pathos.multiprocessing import ProcessPool 
from pathos.threading  import ThreadPool 

class SingletonMeta(type): 
    def __new__(cls, name, bases, dict): 
     dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self 
     return super(SingletonMeta, cls).__new__(cls, name, bases, dict) 

    def __init__(cls, name, bases, dict): 
     super(SingletonMeta, cls).__init__(name, bases, dict) 
     cls.instance = None 

    def __call__(cls,*args,**kw): 
     if cls.instance is None: 
      cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) 
     return cls.instance 

    def __deepcopy__(self, item): 
     return item.__class__.instance 

class Globals(object): 
    __metaclass__ = SingletonMeta 
    """  
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children 

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug  
    """ 
    def __init__(self): 
     print "%s::__init__()" % (self.__class__.__name__) 
     self.shared_manager  = Manager() 
     self.shared_process_pool = ProcessPool() 
     self.shared_thread_pool = ThreadPool() 
     self.shared_lock   = Lock()  # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin 

然后从其他地方安全地导入您的代码

from globals import Globals 
Globals().shared_manager  
Globals().shared_process_pool 
Globals().shared_thread_pool 
Globals().shared_lock