2017-09-28 27 views
0

我试图找出一种很好的方法来使用Python 3.6中的multiprocessing包运行一组大约100个任务,其中最多同时运行4个任务。我也想要:Python多处理:检索下一个结果

  1. 反复收集池中下一个完成的任务并处理其返回值,直到所有任务都成功或失败;
  2. 使任何给定任务中抛出的异常非致命,所以我仍然可以从其他任务中访问结果。

我不需要维护提交给池的任务的顺序(即我不需要队列)。任务总数(“100”以上)不是非常大,例如,我不介意一次全部提交,让他们排队,直到有工人。

我认为multiprocessing.Pool会很适合这个,但我似乎无法找到可以迭代调用的“get next result”方法。

这是我将不得不从流程管理原语中滚动吗?或者可以Pool(或者我缺少的其他东西)支持这个工作流程?

对于上下文,我使用每个worker调用可能需要几分钟的远程进程,并且有能力同时处理N个作业(上面具体化示例中的“4”)。

+0

http://pyvideo.org/search.html?q=multiprocessing – wwii

+0

@wwii是否有一些视频特别推荐您解决这个问题? –

+0

只有一般 - 我觉得Pycon的视频讲述的内容非常丰富。另外,[multiprocessing module documentation](https://docs.python.org/3/library/multiprocessing.html)中给出的示例似乎足以让我在开始玩游戏时进行实验。 – wwii

回答

0

我想出了以下模式(显示使用2名工人& 6个就业机会,而不是4 & 100):

import random 
import time 
from multiprocessing import Pool, TimeoutError 
from queue import Queue 


def worker(x): 
    print("Start: {}".format(x)) 
    time.sleep(5 * random.random()) # Sleep a random amount of time 
    if x == 2: 
     raise Exception("Two is bad") 
    return x 


if __name__ == '__main__': 

    with Pool(processes=2) as pool: 
     jobs = Queue() 
     for i in range(6): 
      jobs.put(pool.apply_async(worker, [i])) 

     while not jobs.empty(): 
      j = jobs.get(timeout=1) 
      try: 
       r = j.get(timeout=0.1) 
       print("Done: {}".format(r)) 
      except TimeoutError as e: 
       jobs.put(j) # Not ready, try again later 
      except Exception as e: 
       print("Exception: {}".format(e)) 

似乎工作得很好:

Start: 0 
Start: 1 
Start: 2 
Done: 1 
Start: 3 
Exception: Two is bad 
Start: 4 
Start: 5 
Done: 3 
Done: 4 
Done: 5 
Done: 0 

我会看看我是否可以制作一个通用的工具来管理我的排队。

我认为它的主要缺点是完成的工作可能会被忽略一段时间,而未完成的工作被轮询并可能超时。避免这可能需要使用回调 - 如果它成为一个足够大的问题,我可能会添加到我的应用程序。

+0

你不应该排队工作,但结果。对于'apply_async'的'callback'参数,您可以设置一个将结果放入队列的函数。然后一个单独的线程可以顺序地“获得”结果。 –