2014-09-22 29 views
3

我正在使用Python 2.7。Python中ThreadPool中每个线程的超时

我目前使用ThreadPoolExecuter这样的:

params = [1,2,3,4,5,6,7,8,9,10] 
with concurrent.futures.ThreadPoolExecutor(5) as executor: 
    result = list(executor.map(f, params)) 

问题是f有时运行时间过长。每当我运行f,我想限制它的运行到100秒,然后杀死它。

最终,对于xparam中的每个元素,我想指出是否必须杀死f,如果不是 - 返回值是多少。 即使f超时一个参数,我仍然想运行它与下一个参数。

executer.map方法确实有一个timeout参数,但它为整个运行设置了一个超时时间,从调用时间到executer.map,而不是分别为每个线程。

什么是最简单的方法来获得我想要的行为?

+1

没有直接的方法来杀死Python中的线程。如果传递给map的timeout超时,它不会实际终止执行程序线程,它只会使'future.result(timeout)'调用它在内部引发'TimeoutError'异常。虽然,工作线程将继续在后台运行。如果您需要线程实际被终止,您需要让您的工作人员函数检查父代在超时过期后可以设置的某种标志。然而,这可能并不容易实现,这取决于工作人员功能在做什么。 – dano 2014-09-22 14:35:21

+0

@dano:我明白了。仍然在后台运行的过程是我可能能够忍受的事情。但让我们说线程处理参数[4]卡住了,我仍然可以得到处理params [5]的参数params [9]的结果吗? – user302099 2014-09-22 15:32:41

+0

@ user302099:如果在'params [4]'之前准备好了,你可以使用'as_completed()'而不是'map()'来得到'params [5]'结果。如果你使用线程,那么函数应该配合(尊重退出条件)。如果你不能依赖这个函数来行为,那么就使用进程。 – jfs 2014-09-22 15:52:41

回答

3

这个答案是关于python的多处理库,它通常比线程库更可取,除非你的函数正在等待网络调用。请注意,多处理和线程库具有相同的接口。

鉴于您每个进程运行潜在的100秒,相比之下,创建每个进程的开销相当小。您可能必须制定自己的流程才能获得必要的控制权。

一种选择是包装在另一个函数f将为是100秒exectue:

from multiprocessing import Pool 

def timeout_f(arg): 
    pool = Pool(processes=1) 
    return pool.apply_async(f, [arg]).get(timeout=100) 

然后你的代码更改为:

result = list(executor.map(timeout_f, params)) 

或者,你可以写您自己的线程/过程控制:

from multiprocessing import Process 
from time import time 

def chunks(l, n): 
    """ Yield successive n-sized chunks from l. """ 
    for i in xrange(0, len(l), n): 
     yield l[i:i+n] 

processes = [Process(target=f, args=(i,)) for i in params] 
exit_codes = [] 
for five_processes = chunks(processes, 5): 
    for p in five_processes: 
     p.start() 
    time_waited = 0 
    start = time() 
    for p in five_processes: 
     if time_waited >= 100: 
      p.join(0) 
      p.terminate() 
     p.join(100 - time_waited) 
     p.terminate() 
     time_waited = time() - start 
    for p in five_processes: 
     exit_codes.append(p.exit_code) 

您需要通过类似Can I get a return value from multiprocessing.Process?

得到返回值如果进程完成,进程的退出代码为0,如果它们已终止,进程的退出代码为非零。

技术来自: Join a group of python processes with a timeoutHow do you split a list into evenly sized chunks?


作为另一种选择,你可以只尝试使用apply_async上multiprocessing.Pool

from multiprocessing import Pool, TimeoutError 
from time import sleep  

if __name__ == "__main__": 
    pool = Pool(processes=5) 
    processes = [pool.apply_async(f, [i]) for i in params] 
    results = [] 
    for process in processes: 
     try: 
      result.append(process.get(timeout=100)) 
     except TimeoutError as e: 
      results.append(e) 

注意上面可能等待超过100秒,每过程,就好像第一个过程需要50秒完成,第二个过程在其运行时间内将有50秒多余的时间。更复杂的逻辑(如前面的例子)需要执行更严格的超时。

+0

第一种解决方案强制您等待100秒,即使所有进程最终在5秒内完成。您可能需要一个睡眠几秒钟的循环,然后检查是否有任何进程仍在运行,何时返回睡眠状态。 – dano 2014-09-22 15:59:50

+0

@dano是的,正在写一个快速的答案。更新为使用更好的逻辑 – Zags 2014-09-22 16:06:40

+0

它看起来像你做了某种复制/粘贴错误与您的编辑。缩进是关闭的,你调用'join'和'terminate'两次。 – dano 2014-09-22 16:21:23