2014-07-12 38 views
14

假设您使用的是multiprocessing.Pool对象,并且您正在使用构造函数的initializer设置来传递初始化函数,然后在全局名称空间中创建资源。假设资源具有上下文管理器。你如何处理上下文管理资源的生命周期,前提是它必须贯穿整个流程的生命周期,但最后要妥善清理?上下文管理器和多处理池

到目前为止,我有一些有点像这样:

resource_cm = None 
resource = None 


def _worker_init(args): 
    global resource 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 

从这里开始,池进程可以使用的资源。到现在为止还挺好。但处理清理有点棘手,因为multiprocessing.Pool类不提供destructordeinitializer参数。

我的一个想法是使用atexit模块,并将清理注册到初始化程序中。就像这样:

def _worker_init(args): 
    global resource 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 

    def _clean_up(): 
     resource_cm.__exit__() 

    import atexit 
    atexit.register(_clean_up) 

这是一个好方法吗?有没有更简单的方法来做到这一点?编辑:atexit似乎并不奏效。至少不是我在上面使用它的方式,所以现在我仍然没有解决这个问题的方法。

回答

21

首先,这是一个非常好的问题!各地在multiprocessing代码有点挖后,我想我已经找到了一种方法来做到这一点:

当您启动multiprocessing.Pool,内部的Pool对象创建池的每个成员multiprocessing.Process对象。当这些子进程启动时,他们所谓的_bootstrap功能,它看起来像这样:

def _bootstrap(self): 
    from . import util 
    global _current_process 
    try: 
     # ... (stuff we don't care about) 
     util._finalizer_registry.clear() 
     util._run_after_forkers() 
     util.info('child process calling self.run()') 
     try: 
      self.run() 
      exitcode = 0 
     finally: 
      util._exit_function() 
     # ... (more stuff we don't care about) 

run方法是实际运行target你给Process对象。对于Pool进程,这是一个具有长时间运行while循环的方法,该循环等待工作项通过内部队列进入。我们真正感兴趣的是发生了什么后self.runutil._exit_function()被调用。

事实证明,该函数做一些清理听起来很像你在找什么:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, 
        active_children=active_children, 
        current_process=current_process): 
    # NB: we hold on to references to functions in the arglist due to the 
    # situation described below, where this function is called after this 
    # module's globals are destroyed. 

    global _exiting 

    info('process shutting down') 
    debug('running all "atexit" finalizers with priority >= 0') # Very interesting! 
    _run_finalizers(0) 

这里是_run_finalizers文档字符串:

def _run_finalizers(minpriority=None): 
    ''' 
    Run all finalizers whose exit priority is not None and at least minpriority 

    Finalizers with highest priority are called first; finalizers with 
    the same priority will be called in reverse order of creation. 
    ''' 

的方法实际上运行终结器回调列表并执行它们:

items = [x for x in _finalizer_registry.items() if f(x)] 
items.sort(reverse=True) 

for key, finalizer in items: 
    sub_debug('calling %s', finalizer) 
    try: 
     finalizer() 
    except Exception: 
     import traceback 
     traceback.print_exc() 

Perf等。那么我们如何进入_finalizer_registry?有一个在multiprocessing.util称为Finalize一个未公开的对象,它是负责将回调到注册表:

class Finalize(object): 
    ''' 
    Class which supports object finalization using weakrefs 
    ''' 
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): 
     assert exitpriority is None or type(exitpriority) is int 

     if obj is not None: 
      self._weakref = weakref.ref(obj, self) 
     else: 
      assert exitpriority is not None 

     self._callback = callback 
     self._args = args 
     self._kwargs = kwargs or {} 
     self._key = (exitpriority, _finalizer_counter.next()) 
     self._pid = os.getpid() 

     _finalizer_registry[self._key] = self # That's what we're looking for! 

好了,把它全部的例子:

import multiprocessing 
from multiprocessing.util import Finalize 

resource_cm = None 
resource = None 

class Resource(object): 
    def __init__(self, args): 
     self.args = args 

    def __enter__(self): 
     print("in __enter__ of %s" % multiprocessing.current_process()) 
     return self 

    def __exit__(self, *args, **kwargs): 
     print("in __exit__ of %s" % multiprocessing.current_process()) 

def open_resource(args): 
    return Resource(args) 

def _worker_init(args): 
    global resource 
    print("calling init") 
    resource_cm = open_resource(args) 
    resource = resource_cm.__enter__() 
    # Register a finalizer 
    Finalize(resource, resource.__exit__, exitpriority=16) 

def hi(*args): 
    print("we're in the worker") 

if __name__ == "__main__": 
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",)) 
    pool.map(hi, range(pool._processes)) 
    pool.close() 
    pool.join() 

输出:

calling init 
in __enter__ of <Process(PoolWorker-1, started daemon)> 
calling init 
calling init 
in __enter__ of <Process(PoolWorker-2, started daemon)> 
in __enter__ of <Process(PoolWorker-3, started daemon)> 
calling init 
in __enter__ of <Process(PoolWorker-4, started daemon)> 
we're in the worker 
we're in the worker 
we're in the worker 
we're in the worker 
in __exit__ of <Process(PoolWorker-1, started daemon)> 
in __exit__ of <Process(PoolWorker-2, started daemon)> 
in __exit__ of <Process(PoolWorker-3, started daemon)> 
in __exit__ of <Process(PoolWorker-4, started daemon)> 

正如你所看到的__exit__在我们所有的工作人员被召集,当我们join()池。