0

我使用如下所示的 concurrent.futures ProcessPoolExecutor方法并行化了大型CPU密集型数据处理任务。Python:如何挂起进程以释放control.futures池中的控制权?

with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor: 
     futures_ocr = ([ 
      executor.submit(
       MyProcessor, 
       folder 
      ) for folder in sub_folders 
     ]) 

     is_cancel = wait_for(futures_ocr) 
     if is_cancel: 
      print 'shutting down executor' 
      executor.shutdown() 

def wait_for(futures): 
    """Handes the future tasks after completion""" 

    cancelled = False 

    try: 
     for future in concurrent.futures.as_completed(futures, timeout=200): 
      try: 
       result = future.result() 
       print 'successfully finished processing folder: ', result.source_folder_path 

      except concurrent.futures.TimeoutError: 
       print 'TimeoutError occured' 


      except TypeError: 
       print 'TypeError occured' 



    except KeyboardInterrupt: 
     print '****** cancelling... *******' 
     cancelled = True 
     for future in futures: 
      future.cancel() 

    return cancelled 

有一些文件夹,其中的过程似乎被卡住不能在代码中的一些错误的原因,但由于文件的性质正在处理的很长一段时间。所以,我想超时这些类型的进程,以便在超过某个时间限制时返回。池然后可以使用该过程来进行下一个可用任务。

as_completed()函数中添加超时会在完成时给出错误。

Traceback (most recent call last): 
    File "call_ocr.py", line 96, in <module> 
    main() 
    File "call_ocr.py", line 42, in main 
    is_cancel = wait_for(futures_ocr) 
    File "call_ocr.py", line 59, in wait_for 
    for future in concurrent.futures.as_completed(futures, timeout=200): 
    File "/Users/saurav/.pyenv/versions/ocr/lib/python2.7/site-packages/concurrent/futures/_base.py", line 216, in as_completed 
    len(pending), len(fs))) 
concurrent.futures._base.TimeoutError: 3 (of 3) futures unfinished 

我在做什么错在这里,什么是造成已逾时过程停止并放弃处理回过程池的最佳方式?

回答

0

concurrent.futures实现不支持这种用例。

可以传递给它的函数和方法的timeout允许设置等待结果多久,但对实际计算本身没有影响。

pebble库支持这种用例。

from concurrent.futures import TimeoutError 
from pebble import ProcessPool 

def function(n): 
    return n 

with ProcessPool() as pool: 
    future = pool.schedule(function, args=[1], timeout=10) 

    try: 
     results = future.result() 
    except TimeoutError as error: 
     print("function took longer than %d seconds" % error.args[1])