2011-04-14 30 views
45

我有一个脚本,真实成功做了多处理池组任务与imap_unordered()电话:显示Python多处理池地图调用的进度?

p = multiprocessing.Pool() 
rs = p.imap_unordered(do_work, xrange(num_tasks)) 
p.close() # No more work 
p.join() # Wait for completion 

但是,我num_tasks约为250,000,所以join()锁定10秒左右的主线程,我希望能够递增地回显命令行以显示主进程未被锁定。例如:

p = multiprocessing.Pool() 
rs = p.imap_unordered(do_work, xrange(num_tasks)) 
p.close() # No more work 
while (True): 
    remaining = rs.tasks_remaining() # How many of the map call haven't been done yet? 
    if (remaining == 0): break # Jump out of while loop 
    print "Waiting for", remaining, "tasks to complete..." 
    time.sleep(2) 

是否有结果对象或池本身的方法,指示剩余任务的数量?我尝试使用multiprocessing.Value对象作为计数器(do_work在完成其任务后调用counter.value += 1操作),但计数器只有在停止递增之前达到总值的85%。

回答

54

无需访问结果集的私有属性:

from __future__ import division 
import sys 

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1): 
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks)) 
+4

我只在代码退出后才看到打印输出(不是每次迭代)。你有什么建议吗? – 2014-11-06 10:47:36

+0

@HananShteingart:在我的系统(Ubuntu)上,Python 2和Python 3都可以正常工作。我以'def do_word(* a):time.sleep(.1)'为例。如果它不适合你,然后创建一个[完整的最小代码示例](http://stackoverflow.com/help/mcve),它演示了你的问题:使用文字描述你期望发生的事情和发生的事情,提及你如何运行你的Python脚本,你的操作系统,Python版本以及[作为新问题发布](http://tinyurl.com/stack-hints)。 – jfs 2014-12-01 17:13:26

+8

我遇到了与@HananShteingart相同的问题:这是因为我试图使用'Pool.map()'。我没有意识到_only_'imap()'和'imap_unordered()'以这种方式工作 - 文档只是说“一个lazier版本的map()”,但实际上意味着“底层迭代器返回结果” 。 – simonmacmullen 2015-03-24 16:01:12

17

通过一些更深入的挖掘发现了一个答案:看看imap_unordered结果对象的__dict__,我发现它有一个_index属性,随着每个任务的完成而增加。所以这个工程的记录,裹在while循环:

p = multiprocessing.Pool() 
rs = p.imap_unordered(do_work, xrange(num_tasks)) 
p.close() # No more work 
while (True): 
    completed = rs._index 
    if (completed == num_tasks): break 
    print "Waiting for", num_tasks-completed, "tasks to complete..." 
    time.sleep(2) 

不过,我也发现调换imap_unorderedmap_async导致更快的执行,但结果对象是一个有点不同。相反,从map_async结果对象有_number_left属性和ready()方法:

p = multiprocessing.Pool() 
rs = p.map_async(do_work, xrange(num_tasks)) 
p.close() # No more work 
while (True): 
    if (rs.ready()): break 
    remaining = rs._number_left 
    print "Waiting for", remaining, "tasks to complete..." 
    time.sleep(0.5) 
+3

我测试了这个for Python 2.7.6和rs._number_left似乎是剩余的块数。所以如果rs._chunksize不是1,那么rs._number_left不会是剩余列表项的数量。 – Allen 2014-08-19 21:14:30

+0

我应该在哪里放这个代码?我的意思是,直到“rs”的内容已知并且有点迟了或没有执行,才会执行此操作。 – 2015-08-23 22:24:39

+0

@WakanTanka:它将主脚本放入主线程中,然后将多余的线程分离出来。在我最初的例子中,它出现在“while”循环中,其中'rs'已经启动了其他线程。 – MidnightLightning 2015-08-24 11:58:35

5

我知道,这是一个这是一个古老的问题,但是当我想跟踪python中任务池的进展时,我正在做这件事。

from progressbar import ProgressBar, SimpleProgress 
import multiprocessing as mp 
from time import sleep 

def my_function(letter): 
    sleep(2) 
    return letter+letter 

dummy_args = ["A", "B", "C", "D"] 
pool = mp.Pool(processes=2) 

results = [] 

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start() 

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args] 

while len(results) != len(dummy_args): 
    pbar.update(len(results)) 
    sleep(0.5) 
pbar.finish() 

print results 

基本上,您使用apply_async用callbak(在这种情况下,追加返回的值列表),所以你不必等待其他人做一些事情。然后,在一个while循环中,检查工作的进度。在这种情况下,我添加了一个小部件,使其看起来更好。

输出:

4 of 4                   
['AA', 'BB', 'CC', 'DD'] 

希望它能帮助。

+0

gotta change:'[pool.apply_async(my_function,(x,),callback = results.append)for dummy_args]'for'(pool.apply_async(my_function,(x,),callback = results.append)for x在dummy_args中)' – 2015-08-28 14:10:56

+0

这是不正确的。发电机对象在这里不起作用。经过。 – swagatam 2016-07-13 18:04:33

2

我创建了一个自定义类来创建进度打印输出。莫比这有助于:

from multiprocessing import Pool, cpu_count 


class ParallelSim(object): 
    def __init__(self, processes=cpu_count()): 
     self.pool = Pool(processes=processes) 
     self.total_processes = 0 
     self.completed_processes = 0 
     self.results = [] 

    def add(self, func, args): 
     self.pool.apply_async(func=func, args=args, callback=self.complete) 
     self.total_processes += 1 

    def complete(self, result): 
     self.results.extend(result) 
     self.completed_processes += 1 
     print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100)) 

    def run(self): 
     self.pool.close() 
     self.pool.join() 

    def get_results(self): 
     return self.results 
35

我个人最喜欢的 - 为您提供了一个可爱的小进度条和完成ETA同时运行的东西和并行提交。

from multiprocessing import Pool 
import tqdm 

pool = Pool(processes=8) 
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)): 
    pass 
+13

如果池返回一个值,该怎么办? – Nickpick 2017-02-06 10:57:19

+1

我在循环内创建了一个名为result的空列表,然后在循环内部执行result.append(x)。我尝试了2个进程,并使用imap而不是map,所有工作都按照我希望的方式进行@nickpick – bs7280 2017-07-12 22:08:04

9

我发现当我试图检查它的进展时,工作已经完成。这对我来说很有用。

from multiprocessing import Pool 
import tqdm 

tasks = range(5) 
pool = Pool() 
pbar = tqdm(total=len(tasks)) 

def do_work(x): 
    # do something with x 
    pbar.update(1) 

pool.imap_unordered(do_work, tasks) 
pool.close() 
pool.join() 
pbar.close() 

这应该适用于所有类型的多处理,无论它们是否阻塞。