2009-01-23 781 views
145

我想每隔60秒就会重复执行一次Python函数(就像Objective C中的NSTimer一样)。这段代码将作为一个守护进程运行,就像使用cron每分钟调用一次python脚本一样,但不需要由用户设置。在Python中每隔x秒重复执行一次函数的最佳方式是什么?

this question about a cron implemented in Python中,该解决方案似乎实际上只有sleep() x秒。我不需要这样的高级功能所以也许这样的事情会工作

while True: 
    # Code executed here 
    time.sleep(60) 

是否有与此代码的任何可预见的问题?

+41

迂腐点,但可能是重要的,你的代码上面的代码不执行每60秒却让处决之间有60秒的差距。如果你的执行代码完全没有时间,它只会每60秒发生一次。 – Simon 2009-01-23 21:12:24

+0

Dupe:http://stackoverflow.com/questions/373335/suggestions-for-a-cron-like-scheduler-in-python – 2009-01-23 22:14:24

+2

也``time.sleep(60)``可能会返回更早和更晚 – jfs 2014-03-19 07:25:21

回答

137

使用sched模块,该模块实现通用事件调度程序。

import sched, time 
s = sched.scheduler(time.time, time.sleep) 
def do_something(sc): 
    print "Doing stuff..." 
    # do your stuff 
    s.enter(60, 1, do_something, (sc,)) 

s.enter(60, 1, do_something, (s,)) 
s.run() 
+9

调度模块用于调度函数在一段时间后运行,如何在不使用time.sleep()的情况下每隔x秒重复一次函数调用? – 2009-01-23 21:13:24

4

该和cron之间的主要区别是,一个异常会杀死守护进程的好处。您可能想包装一个异常捕获器和记录器。

48

你可能想要考虑Twisted这是一个python网络库,实现了Reactor Pattern

from twisted.internet import task 
from twisted.internet import reactor 

timeout = 60.0 # Sixty seconds 

def doWork(): 
    #do work here 
    pass 

l = task.LoopingCall(doWork) 
l.start(timeout) # call every sixty seconds 

reactor.run() 

虽然“而真:睡眠(60)”也许可以工作的扭曲可能已经实现了许多,你最终将需要的功能,将可能被(系统守护进程,如bobince指出的记录或异常处理)一个更强大的解决方案

30

更简单的方法我认为是:

import time 

def executeSomething(): 
    #code here 
    time.sleep(60) 

while True: 
    executeSomething() 

执行代码这样的话,等待60秒钟,然后再次执行,等待,执行,等等 无需要使事情复杂化:D

5

我在某段时间后面临类似的问题。可能是http://cronus.readthedocs.org可能有帮助吗?

对于V0.2,下面的代码片段工作

import cronus.beat as beat 

beat.set_rate(2) # 2 Hz 
while beat.true(): 
    # do some time consuming work here 
    beat.sleep() # total loop duration would be 0.5 sec 
81

就锁定时间循环的系统时钟。简单。

import time 
starttime=time.time() 
while True: 
    print "tick" 
    time.sleep(60.0 - ((time.time() - starttime) % 60.0)) 
28

如果要定期执行的功能,而不是阻止无限循环我会使用一个线程定时器非阻塞方式。这样,你的代码就可以继续运行并执行其他任务,并且每n秒都会调用一次你的函数。我使用这种技术在很长的CPU /磁盘/网络密集型任务上打印进度信息。

这里是我已经张贴类似的问题的代码,以启动()和stop()控制:

from threading import Timer 

class RepeatedTimer(object): 
    def __init__(self, interval, function, *args, **kwargs): 
     self._timer  = None 
     self.interval = interval 
     self.function = function 
     self.args  = args 
     self.kwargs  = kwargs 
     self.is_running = False 
     self.start() 

    def _run(self): 
     self.is_running = False 
     self.start() 
     self.function(*self.args, **self.kwargs) 

    def start(self): 
     if not self.is_running: 
      self._timer = Timer(self.interval, self._run) 
      self._timer.start() 
      self.is_running = True 

    def stop(self): 
     self._timer.cancel() 
     self.is_running = False 

用法:

from time import sleep 

def hello(name): 
    print "Hello %s!" % name 

print "starting..." 
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start() 
try: 
    sleep(5) # your long-running job goes here... 
finally: 
    rt.stop() # better in a try/finally block to make sure the program ends! 

特点:

  • 仅标准库,无外部依赖性
  • start() a第二stop()是安全的,多次调用,即使定时器已经开始/停止
  • 要调用的函数可以有位置和命名参数
  • 您可以随时更改interval,这将是下一次运行后生效。 argskwargs甚至function也一样!
10

这里有一个更新,从MestreLion的代码,避免了随着时间的推移drifiting:

import threading 
import time 

class RepeatedTimer(object): 
    def __init__(self, interval, function, *args, **kwargs): 
    self._timer = None 
    self.interval = interval 
    self.function = function 
    self.args = args 
    self.kwargs = kwargs 
    self.is_running = False 
    self.next_call = time.time() 
    self.start() 

    def _run(self): 
    self.is_running = False 
    self.start() 
    self.function(*self.args, **self.kwargs) 

    def start(self): 
    if not self.is_running: 
     self.next_call += self.interval 
     self._timer = threading.Timer(self.next_call - time.time(), self._run) 
     self._timer.start() 
     self.is_running = True 

    def stop(self): 
    self._timer.cancel() 
    self.is_running = False 
0

我用这导致每小时60个事件与整个分钟后,在同样的时间发生的大多数事件:

import math 
import time 
import random 

TICK = 60 # one minute tick size 
TICK_TIMING = 59 # execute on 59th second of the tick 
TICK_MINIMUM = 30 # minimum catch up tick size when lagging 

def set_timing(): 

    now = time.time() 
    elapsed = now - info['begin'] 
    minutes = math.floor(elapsed/TICK) 
    tick_elapsed = now - info['completion_time'] 
    if (info['tick']+1) > minutes: 
     wait = max(0,(TICK_TIMING-(time.time() % TICK))) 
     print ('standard wait: %.2f' % wait) 
     time.sleep(wait) 
    elif tick_elapsed < TICK_MINIMUM: 
     wait = TICK_MINIMUM-tick_elapsed 
     print ('minimum wait: %.2f' % wait) 
     time.sleep(wait) 
    else: 
     print ('skip set_timing(); no wait') 
    drift = ((time.time() - info['begin']) - info['tick']*TICK - 
     TICK_TIMING + info['begin']%TICK) 
    print ('drift: %.6f' % drift) 

info['tick'] = 0 
info['begin'] = time.time() 
info['completion_time'] = info['begin'] - TICK 

while 1: 

    set_timing() 

    print('hello world') 

    #random real world event 
    time.sleep(random.random()*TICK_MINIMUM) 

    info['tick'] += 1 
    info['completion_time'] = time.time() 

取决于实际情况,你可能会得到长的蜱:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc. 

但在60分钟结束时,您将有60个滴答声;并且他们中的大多数会以您喜欢的分钟的正确偏移量出现。

在我的系统上,我得到了典型的<漂移1/20秒,直到需要更正为止。

该方法的优点是时钟漂移的分辨率;这可能会导致问题,如果你正在做一些事情,例如每次打勾附加一个项目,并且你希望每小时追加60个项目。不考虑漂移会导致二次显示,如移动平均值将数据视为太深,导致输出错误。

1

一个可能的答案:

import time 
t=time.time() 

while True: 
    if time.time()-t>10: 
     #run your task here 
     t=time.time() 
0

例如,显示当前本地时间

import datetime 
import glib 
import logger 

def get_local_time(): 
    current_time = datetime.datetime.now().strftime("%H:%M") 
    logger.info("get_local_time(): %s",current_time) 
    return str(current_time) 

def display_local_time(): 
    logger.info("Current time is: %s", get_local_time()) 
    return True 

# call every minute 
glib.timeout_add(60*1000, display_local_time) 
相关问题