2012-07-18 97 views
6

有没有办法重新发送一块数据进行处理,如果原来的计算失败,使用一个简单的池?蟒蛇多处理池重试

import random 
from multiprocessing import Pool 

def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
+1

也许你想'回报˚F (x)'而不是引发'ValueError'?只是猜测... – 2012-07-24 02:03:16

+0

实际应用中失败的几率有多高?也就是说,与等待其他进程首先完成相比,进程重试的过程有多重要? – Isaac 2012-07-24 02:05:12

+0

这是一个失败的中等机会,它不需要立即重试(但最终应平行重试)。 – ash 2012-07-24 05:29:48

回答

9

如果你可以(或者不介意)立即重试,使用装饰包裹的功能:

import random 
from multiprocessing import Pool 
from functools import wraps 

def retry(f): 
    @wraps(f) 
    def wrapped(*args, **kwargs): 
     while True: 
      try: 
       return f(*args, **kwargs) 
      except ValueError: 
       pass 
    return wrapped 

@retry 
def f(x): 
    if random.getrandbits(1): 
     raise ValueError("Retry this computation") 
    return x*x 

p = Pool(5) 
# If one of these f(x) calls fails, retry it with another (or same) process 
p.map(f, [1,2,3]) 
5

可以使用Queue通过一个循环的开始Process反馈故障到Pool

import multiprocessing as mp 
import random 

def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    total_items = len(pending) 
    successful = [] 
    failure_tracker = [] 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, pending) 
    retry_results = [] 
    while len(successful) < total_items: 
     successful.extend([r for r in results if not r is None]) 
     successful.extend([r for r in retry_results if not r is None]) 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     if failed_items: 
      failure_tracker.append(failed_items) 
      retry_results = p.imap(f, failed_items); 
    p.close() 
    p.join() 

    print "Results: %s" % successful 
    print "Failures: %s" % failure_tracker 

if __name__ == '__main__': 
    main(range(1, 10)) 

输出是这样的:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9] 
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []] 

一个Pool着在多个进程之间共享。因此,这种基于Queue的方法。如果你试图通过一个池作为参数传递给池的过程中,您将收到此错误:

NotImplementedError: pool objects cannot be passed between processes or pickled 

你可以尝试或者你的函数f在数立即重试,以避免同步开销。这实际上是一个问题,你的函数应该等待多久才能重试,以及如果立即重试成功的可能性有多大。


老答案:为了完整起见,这里是我的老的答案,这是不直接重新提交入池是最佳的,但可能仍然是相关取决于使用情况,因为它提供了一个自然的方式来处理/限n -level重:

可以使用Queue来聚集失败和在每次运行结束时重新提交,在多次运行:

import multiprocessing as mp 
import random 


def f(x): 
    if random.getrandbits(1): 
     # on failure/exception catch 
     f.q.put(x) 
     return None 
    return x*x 

def f_init(q): 
    f.q = q 

def main(pending): 
    run_number = 1 
    while pending: 
     jobs = pending 
     pending = [] 

     q = mp.Queue() 
     p = mp.Pool(None, f_init, [q]) 
     results = p.imap(f, jobs) 
     p.close() 

     p.join() 
     failed_items = [] 
     while not q.empty(): 
      failed_items.append(q.get()) 
     successful = [r for r in results if not r is None] 
     print "(%d) Succeeded: %s" % (run_number, successful) 
     print "(%d) Failed: %s" % (run_number, failed_items) 
     print 
     pending = failed_items 
     run_number += 1 

if __name__ == '__main__': 
    main(range(1, 10)) 

,像这样的输出:

(1) Succeeded: [9, 16, 36, 81] 
(1) Failed: [2, 1, 5, 7, 8] 

(2) Succeeded: [64] 
(2) Failed: [2, 1, 5, 7] 

(3) Succeeded: [1, 25] 
(3) Failed: [2, 7] 

(4) Succeeded: [49] 
(4) Failed: [2] 

(5) Succeeded: [4] 
(5) Failed: [] 
+0

已将我的答案更新为不需要多次运行的答案,现在可以在同一个原始资源池中运行。 – 2012-07-24 04:44:12

+0

感谢您的详细回复。我喜欢将重试失败的计算放入队列中的想法。我必须奖励安德鲁,因为他的解决方案只是简单的重试。 – ash 2012-07-24 20:10:38

+0

@ash我在我的回复中提到立即重试,认为这将是一个微不足道的/简单的添加,而不是你想要的。还要注意的是,即时重试对于所有情况都不是最佳的,尤其是那些立即重试成功几率较低的情况(在这种情况下,它会严重不理想,因为它会导致可能成功的工作资源匮乏。)恭喜Andrew无论如何。 – 2012-07-26 04:26:32