2016-02-03 51 views
1

我想找到方法来启动一个新的进程,并得到它的输出,如果它需要少于X秒。如果过程需要更多时间,我想忽略过程结果,杀死过程并继续。启动python进程输出和超时

我需要基本上添加计时器到下面的代码。现在确定是否有更好的方法来做到这一点,我打开了一个不同的更好的解决方案。

from multiprocessing import Process, Queue 

def f(q): 
    # Ugly work 
    q.put(['hello', 'world']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print q.get() 
    p.join() 

谢谢!

+0

请参阅[超时在Python函数调用](http://stackoverflow.com/q/492519/4279)具体[此答案](http://stackoverflow.com/a/14924210/4279) – jfs

回答

1

您可能会发现下面的模块在你的情况下非常有用:

模块

#! /usr/bin/env python3 
"""Allow functions to be wrapped in a timeout API. 

Since code can take a long time to run and may need to terminate before 
finishing, this module provides a set_timeout decorator to wrap functions.""" 

__author__ = 'Stephen "Zero" Chappell ' \ 
      '<[email protected]>' 
__date__ = '18 December 2017' 
__version__ = 1, 0, 1 
__all__ = [ 
    'set_timeout', 
    'run_with_timeout' 
] 

import multiprocessing 
import sys 
import time 

DEFAULT_TIMEOUT = 60 


def set_timeout(limit=None): 
    """Return a wrapper that provides a timeout API for callers.""" 
    if limit is None: 
     limit = DEFAULT_TIMEOUT 
    _Timeout.validate_limit(limit) 

    def wrapper(entry_point): 
     return _Timeout(entry_point, limit) 

    return wrapper 


def run_with_timeout(limit, polling_interval, entry_point, *args, **kwargs): 
    """Execute a callable object and automatically poll for results.""" 
    engine = set_timeout(limit)(entry_point) 
    engine(*args, **kwargs) 
    while engine.ready is False: 
     time.sleep(polling_interval) 
    return engine.value 


def _target(queue, entry_point, *args, **kwargs): 
    """Help with multiprocessing calls by being a top-level module function.""" 
    # noinspection PyPep8,PyBroadException 
    try: 
     queue.put((True, entry_point(*args, **kwargs))) 
    except: 
     queue.put((False, sys.exc_info()[1])) 


class _Timeout: 
    """_Timeout(entry_point, limit) -> _Timeout instance""" 

    def __init__(self, entry_point, limit): 
     """Initialize the _Timeout instance will all needed attributes.""" 
     self.__entry_point = entry_point 
     self.__limit = limit 
     self.__queue = multiprocessing.Queue() 
     self.__process = multiprocessing.Process() 
     self.__timeout = time.monotonic() 

    def __call__(self, *args, **kwargs): 
     """Begin execution of the entry point in a separate process.""" 
     self.cancel() 
     self.__queue = multiprocessing.Queue(1) 
     self.__process = multiprocessing.Process(
      target=_target, 
      args=(self.__queue, self.__entry_point) + args, 
      kwargs=kwargs 
     ) 
     self.__process.daemon = True 
     self.__process.start() 
     self.__timeout = time.monotonic() + self.__limit 

    def cancel(self): 
     """Terminate execution if possible.""" 
     if self.__process.is_alive(): 
      self.__process.terminate() 

    @property 
    def ready(self): 
     """Property letting callers know if a returned value is available.""" 
     if self.__queue.full(): 
      return True 
     elif not self.__queue.empty(): 
      return True 
     elif self.__timeout < time.monotonic(): 
      self.cancel() 
     else: 
      return False 

    @property 
    def value(self): 
     """Property that retrieves a returned value if available.""" 
     if self.ready is True: 
      valid, value = self.__queue.get() 
      if valid: 
       return value 
      raise value 
     raise TimeoutError('execution timed out before terminating') 

    @property 
    def limit(self): 
     """Property controlling what the timeout period is in seconds.""" 
     return self.__limit 

    @limit.setter 
    def limit(self, value): 
     self.validate_limit(value) 
     self.__limit = value 

    @staticmethod 
    def validate_limit(value): 
     """Verify that the limit's value is not too low.""" 
     if value <= 0: 
      raise ValueError('limit must be greater than zero') 

要使用,请看下面的例子演示其用法:

from time import sleep 


def main(): 
    timeout_after_four_seconds = timeout(4) 
    # create copies of a function that have a timeout 
    a = timeout_after_four_seconds(do_something) 
    b = timeout_after_four_seconds(do_something) 
    c = timeout_after_four_seconds(do_something) 
    # execute the functions in separate processes 
    a('Hello', 1) 
    b('World', 5) 
    c('Jacob', 3) 
    # poll the functions to find out what they returned 
    results = [a, b, c] 
    polling = set(results) 
    while polling: 
     for process, name in zip(results, 'abc'): 
      if process in polling: 
       ready = process.ready 
       if ready is True:  # if the function returned 
        print(name, 'returned', process.value) 
        polling.remove(process) 
       elif ready is None:  # if the function took too long 
        print(name, 'reached timeout') 
        polling.remove(process) 
       else:     # if the function is running 
        assert ready is False, 'ready must be True, False, or None' 
     sleep(0.1) 
    print('Done.') 


def do_something(data, work): 
    sleep(work) 
    print(data) 
    return work 


if __name__ == '__main__': 
    main() 
+1

不得不更改clock()的日期时间,因为它运行在树莓派上,并且原始代码工作不正常。 谢谢! – MrCatacroquer

0

您正在运行的进程是否包含循环? 如果是这样,您可以在启动循环之前获取时间戳,并在循环中包含一个if语句并使用sys.exit();如果当前时间戳与记录的开始时间标记相差超过x秒,则命令终止脚本。

0

所有你需要适应the queue example from the docs到你的情况是超时传递给q.get()通话和超时终止进程:

from Queue import Empty 
... 

try: 
    print q.get(timeout=timeout) 
except Empty: # no value, timeout occured 
    p.terminate() 
    q = None # the queue might be corrupted after the `terminate()` call 
p.join() 

使用Pipe可能更轻巧,否则代码是相同的(您可以使用.poll(timeout)来确定是否有数据可以接收)。

相关问题