2012-12-03 24 views
7

到目前为止,无论何时我需要使用multiprocessing,我都通过手动创建“进程池”并共享包含所有子进程的工作队列来完成此操作。如何获得由Python多处理池完成的“工作”数量?

例如:

from multiprocessing import Process, Queue 


class MyClass: 

    def __init__(self, num_processes): 
     self._log   = logging.getLogger() 
     self.process_list = [] 
     self.work_queue = Queue() 
     for i in range(num_processes): 
      p_name = 'CPU_%02d' % (i+1) 
      self._log.info('Initializing process %s', p_name) 
      p = Process(target = do_stuff, 
         args = (self.work_queue, 'arg1'), 
         name = p_name) 

这样我可以添加的东西到队列中,这将由子过程消耗。然后,我可以监测处理多远是通过检查Queue.qsize()

while True: 
     qsize = self.work_queue.qsize() 
     if qsize == 0: 
      self._log.info('Processing finished') 
      break 
     else: 
      self._log.info('%d simulations still need to be calculated', qsize) 

现在我明白这multiprocessing.Pool可以简化很多这样的代码。

我无法找到的是我该如何监控还有待完成的“工作”的数量。

看看下面的例子:

from multiprocessing import Pool 


class MyClass: 

    def __init__(self, num_processes): 
     self.process_pool = Pool(num_processes) 
     # ... 
     result_list = [] 
     for i in range(1000):    
      result = self.process_pool.apply_async(do_stuff, ('arg1',)) 
      result_list.append(result) 
     # ---> here: how do I monitor the Pool's processing progress? 
     # ...? 

任何想法?

回答

11

使用Manager队列。这是工作进程之间共享的队列。如果使用正常队列,它将被每个工作人员腌渍和取消剔除,并因此被复制,以便每个工作人员无法更新队列。

然后,您的工作人员将东西添加到队列中,并在工作人员正在工作时监视队列的状态。您需要使用map_async这样做,因为这可以让您看到整个结果何时准备就绪,允许您打破监视循环。

实施例:

import time 
from multiprocessing import Pool, Manager 


def play_function(args): 
    """Mock function, that takes a single argument consisting 
    of (input, queue). Alternately, you could use another function 
    as a wrapper. 
    """ 
    i, q = args 
    time.sleep(0.1) # mock work 
    q.put(i) 
    return i 

p = Pool() 
m = Manager() 
q = m.Queue() 

inputs = range(20) 
args = [(i, q) for i in inputs] 
result = p.map_async(play_function, args) 

# monitor loop 
while True: 
    if result.ready(): 
     break 
    else: 
     size = q.qsize() 
     print(size) 
     time.sleep(0.1) 

outputs = result.get() 
1

想出了下面的解决方案为async_call。

琐碎的玩具脚本的例子,但应该广泛应用我认为。

基本上在一个无限循环中轮询列表生成器中结果对象的就绪值,然后求和以计算剩余多少个调度的池任务。

一旦没有剩余中断并加入()& close()。

根据需要添加睡眠环。

与上述解决方案相同,但没有队列。如果您还记录了最初发送池的数量,您可以计算完成百分比等。

import multiprocessing 
import os 
import time 
from random import randrange 


def worker(): 
    print os.getpid() 

    #simulate work 
    time.sleep(randrange(5)) 

if __name__ == '__main__': 

    pool = multiprocessing.Pool(processes=8) 
    result_objs = [] 

    print "Begin dispatching work" 

    task_count = 10 
    for x in range(task_count): 
     result_objs.append(pool.apply_async(func=worker)) 

    print "Done dispatching work" 

    while True: 
     incomplete_count = sum(1 for x in result_objs if not x.ready()) 

     if incomplete_count == 0: 
      print "All done" 
      break 

     print str(incomplete_count) + " Tasks Remaining" 
     print str(float(task_count - incomplete_count)/task_count * 100) + "% Complete" 
     time.sleep(.25) 

    pool.close() 
    pool.join() 
1

我有同样的问题,并想出了MapResult对象有些简单的解决方案(虽然使用内部MapResult数据)

pool = Pool(POOL_SIZE) 

result = pool.map_async(get_stuff, todo) 
while not result.ready(): 
    remaining = result._number_left * result._chunksize 
    sys.stderr.write('\r\033[2KRemaining: %d' % remaining) 
    sys.stderr.flush() 
    sleep(.1) 

print >> sys.stderr, '\r\033[2KRemaining: 0' 

注意,剩余价值并不总是准确的,因为块大小通常取决于要处理的项目数量。

您可以通过使用0123ns