2011-07-18 25 views
52

似乎是从multiprocessing.Pool进程引发异常时,没有堆栈跟踪或任何其他指示失败。例如:多处理引发的异常未检测到池

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = Pool() 
p.apply_async(go) 
p.close() 
p.join() 

打印1并停止静音。有趣的是,引发一个BaseException反而起作用。有什么办法让所有异常的行为与BaseException相同吗?

+1

我有同样的问题。原因如下:工作进程捕获异常并在结果队列中放置失败代码和异常。回到主进程中,Pool的结果处理线程获取失败代码并忽略它。某种猴子补丁调试模式可能是可能的。另一种方法是确保你的工作者函数捕获任何异常,返回它并为你的处理器打印一个错误代码。 –

+0

这已经在这里回答:http://stackoverflow.com/a/26096355/512111 – j08lue

回答

23

我对这个问题的合理解决,至少在调试的目的。目前我还没有一个解决方案可以在主流程中引发异常。我的第一个想法是使用装饰,但你只能腌functions defined at the top level of a module,所以这是正确的。

取而代之的是一个简单的包装类和一个Pool子类,它使用apply_async(因此为apply)。我将离开map_async作为读者的练习。

import traceback 
from multiprocessing.pool import Pool 
import multiprocessing 

# Shortcut to multiprocessing's logger 
def error(msg, *args): 
    return multiprocessing.get_logger().error(msg, *args) 

class LogExceptions(object): 
    def __init__(self, callable): 
     self.__callable = callable 

    def __call__(self, *args, **kwargs): 
     try: 
      result = self.__callable(*args, **kwargs) 

     except Exception as e: 
      # Here we add some debugging help. If multiprocessing's 
      # debugging is on, it will arrange to log the traceback 
      error(traceback.format_exc()) 
      # Re-raise the original exception so the Pool worker can 
      # clean up 
      raise 

     # It was fine, give a normal answer 
     return result 

class LoggingPool(Pool): 
    def apply_async(self, func, args=(), kwds={}, callback=None): 
     return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

multiprocessing.log_to_stderr() 
p = LoggingPool(processes=1) 

p.apply_async(go) 
p.close() 
p.join() 

这给了我:

1 
[ERROR/PoolWorker-1] Traceback (most recent call last): 
    File "mpdebug.py", line 24, in __call__ 
    result = self.__callable(*args, **kwargs) 
    File "mpdebug.py", line 44, in go 
    raise Exception() 
Exception 
+0

这太糟糕了没有一个简单的解决方案(或我的一个错误),但这将完成工作 - 谢谢! –

+4

我已经意识到可以使用装饰器,如果你使用@ functools.wrap(func)来装饰你的包装。这使您的装饰功能看起来像在模块顶层定义的功能。 –

+1

[本答案](http://stackoverflow.com/a/26096355/512111)中的解决方案更简单**和**支持重新提升主进程中的错误! – j08lue

0

我会尝试使用PDB:

import pdb 
import sys 
def handler(type, value, tb): 
    pdb.pm() 
sys.excepthook = handler 
+0

它从来没有达到处理程序在这种情况下,奇怪 –

42

也许我失去了一些东西,但是,这不是什么结果对象返回的get方法?见Process Pools

类multiprocessing.pool.AsyncResult

类由Pool.apply_async()和Pool.map_async()返回的结果。获得([超时])
返回结果当它到达。如果超时时间不为“无”,并且结果未超过 超时秒数,则会引发multiprocessing.TimeoutError。如果远程调用异常,则该异常将由get()重新调整。

所以,稍微修改你的榜样,一个可以做

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception("foobar") 
    print(2) 

p = Pool() 
x = p.apply_async(go) 
x.get() 
p.close() 
p.join() 

其中给出的结果

1 
Traceback (most recent call last): 
    File "rob.py", line 10, in <module> 
    x.get() 
    File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get 
    raise self._value 
Exception: foobar 

这并不完全令人满意,因为它不打印回溯,但有总比没有好。

UPDATE:此bug已被固定在Python 3.4,理查德奥德柯克的礼貌。请参阅问题get method of multiprocessing.pool.Async should return full traceback

+0

这很有用,谢谢。我没有看到。 –

+0

让我知道如果你找出它为什么不返回回溯。由于它能够返回错误值,因此它也应该能够返回回溯。我可能会问一些合适的论坛 - 也许是一些Python开发列表。顺便说一句,正如你可能已经猜到的那样,我在试图找出同样的事情时遇到了你的问题。 :-) –

+4

注意:为了执行一系列同时运行的任务,您应该将所有结果保存在列表中,然后使用get()遍历每个结果,如果您不想要第一个错误。 – dfrankow

1

我创建了一个模块RemoteException.py,显示在一个过程中异常的全面回溯。 Python2。Download it并添加到您的代码:

import RemoteException 

@RemoteException.showError 
def go(): 
    raise Exception('Error!') 

if __name__ == '__main__': 
    import multiprocessing 
    p = multiprocessing.Pool(processes = 1) 
    r = p.apply(go) # full traceback is shown here 
7

我已经受够了这个装饰的成功记录例外:

import traceback, functools, multiprocessing 

def trace_unhandled_exceptions(func): 
    @functools.wraps(func) 
    def wrapped_func(*args, **kwargs): 
     try: 
      func(*args, **kwargs) 
     except: 
      print 'Exception in '+func.__name__ 
      traceback.print_exc() 
    return wrapped_func 

与问题的代码,这是

@trace_unhandled_exceptions 
def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = multiprocessing.Pool(1) 

p.apply_async(go) 
p.close() 
p.join() 

简单装饰你传递给你的进程池的函数。这项工作的关键是@functools.wraps(func),否则多处理将抛出PicklingError。上述

代码给出

1 
Exception in go 
Traceback (most recent call last): 
    File "<stdin>", line 5, in wrapped_func 
    File "<stdin>", line 4, in go 
Exception 
+0

如果函数并行运行 - 在这种情况下为go() - 返回一个值,则这不起作用。装饰器不通过返回值。除此之外,我喜欢这个解决方案。 – MD004

+0

对于传递返回值只是修改wrapper_func这样的: '高清wrapped_func(* ARGS,** kwargs): 结果=无 尝试: 结果= FUNC(* ARGS,** kwargs)除了 : 打印( '例外'+ func .__ name__) traceback.print_exc() 返回结果' 像魅力一样工作;) – MoTSCHIGGE

16

与写作的时候票数最多的解决方案的一个问题:

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception("foobar") 
    print(2) 

p = Pool() 
x = p.apply_async(go) 
x.get() ## waiting here for go() to complete... 
p.close() 
p.join() 

由于@dfrankow指出,它会等待x.get(),这废墟异步运行任务的点。因此,为了提高效率(特别是如果你的工人功能go需要很长的时间)我将其更改为:

from multiprocessing import Pool 

def go(x): 
    print(1) 
    # task_that_takes_a_long_time() 
    raise Exception("Can't go anywhere.") 
    print(2) 
    return x**2 

p = Pool() 
results = [] 
for x in range(1000): 
    results.append(p.apply_async(go, [x])) 

p.close() 

for r in results: 
    r.get() 

优势:工人功能异步运行,所以例如,如果你正在运行多在几个核心上的任务,它将比原来的解决方案高效得多。

缺点:如果在工人功能的异常,它只会提高池后已经完成了所有的任务。这可能是也可能不是理想的行为。编辑根据@ colinfang的评论,这解决了这个问题。

+0

好的努力。但是,由于您的示例是基于假设存在多个结果而预测的,因此可能会扩展一点,以致实际上有多个结果?此外,你写道:“特别是如果你工作的功能”。那应该是“你的”。 –

+0

你是对的,谢谢。我已经扩展了这个例子。 – gozzilli

+1

很酷。此外,您可能想尝试/取决于您希望如何容忍提取中的错误。 – dfrankow

0

既然你已经使用apply_sync,我猜这个用例是想做一些同步任务。使用回调进行处理是另一种选择。请注意,此选项仅适用于python3.2及更高版本,python2.7不适用。

from multiprocessing import Pool 

def callback(result): 
    print('success', result) 

def callback_error(result): 
    print('error', result) 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = Pool() 
p.apply_async(go, callback=callback, error_callback=callback_error) 

# You can do another things 

p.close() 
p.join() 
+0

'apply_async'方法没有'error_callbak',请参考https://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool .apply_async – Sanju

+1

适用于更高版本:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async – Asoul

1
import logging 
from multiprocessing import Pool 

def proc_wrapper(func, *args, **kwargs): 
    """Print exception because multiprocessing lib doesn't return them right.""" 
    try: 
     return func(*args, **kwargs) 
    except Exception as e: 
     logging.exception(e) 
     raise 

def go(x): 
    print x 
    raise Exception("foobar") 

p = Pool() 
p.apply_async(proc_wrapper, (go, 5)) 
p.join() 
p.close() 
0

由于已经为multiprocessing.Pool提供像样的答案,我将提供使用的完整性不同的方法解决。

对于python >= 3.2以下解决方案似乎是最简单的:

from concurrent.futures import ProcessPoolExecutor, wait 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 


futures = [] 
with ProcessPoolExecutor() as p: 
    for i in range(10): 
     futures.append(p.submit(go)) 

results = [f.result() for f in futures] 

优点:

  • 很少的代码
  • 提出的主要工序异常
  • 提供一个堆栈跟踪
  • 无外部依存关系

有关API的更多信息请查看:https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

此外,如果您要提交大量的任务,你想你的主要过程,尽快失败,因为你的任务之一失败了,你可以请使用以下代码片段:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed 
import time 


def go(): 
    print(1) 
    time.sleep(0.3) 
    raise Exception() 
    print(2) 


futures = [] 
with ProcessPoolExecutor(1) as p: 
    for i in range(10): 
     futures.append(p.submit(go)) 

    for f in as_completed(futures): 
     if f.exception() is not None: 
      for f in futures: 
       f.cancel() 
      break 

[f.result() for f in futures] 

所有其他答案只在所有任务执行完成后才会失败。