2014-01-10 34 views
1

我正在制作一个简单的应用程序来流式传输twitter的公共时间表,并且我希望流式传输在一小时后自动停止,并且我不知道如何执行此操作。我阅读日期时间和timeit文档,但无法理解它们。这里是我的代码,它正在流畅地完成我想要的时间线,但无限期地。如何在Python中使用1小时后退出函数

from twython import TwythonStreamer 
import json 
import os 
import datetime 
from datetime import * 
APP_KEY = 'XX' 
APP_SECRET = 'XX' 
OAUTH_TOKEN = 'XX' 
OAUTH_TOKEN_SECRET = 'XX' 
class MyStreamer(TwythonStreamer): 

    def on_success(self, data): 
     print data['text'] 
     with open('scratch1.json', 'ab') as outfile: 
      json.dump(data, outfile, indent = 4) 
     with open('scratch2.json', 'ab') as xoutfile: 
      json.dump(data, xoutfile, indent = 4) 
     return 


    def on_error(self, status_code, data): 
     print status_code 
     return True # Don't kill the stream 

    def on_timeout(self): 
     print >> sys.stderr, 'Timeout...' 
     return True # Don't kill the stream 

stream = MyStreamer(APP_KEY, APP_SECRET, 
        OAUTH_TOKEN, OAUTH_TOKEN_SECRET) 
stream.statuses.filter(follow = [95995660, 8820362]) 

任何人都可以帮我吗?

回答

2

使用datetime.datetime.now()方法获取当前日期时间对象,然后使用timedelta类为其添加一小时。

import datetime 
stop_time = datetime.datetime.now() + datetime.timedelta(hours=1) 

# ... 

# in relevant function ... 
if datetime.datetime.now() > stop_time: 
    stop_streaming() 

我不熟悉你TwythonStreamer类,但可能是这样的:

class MyStreamer(TwythonStreamer): 

    # the init function is called when you create instance of class 
    def __init__(self): 
     self.stop_time = datetime.datetime.now() + datetime.timedelta(hours=1) 

    # ... 

    def on_success(self, data): 
     if datetime.datetime.now() > self.stop_time: 
      raise Exception("Time expired") 

     # ... 
+1

谢谢,完美的作品。 –

+0

我无法获取此代码进行复制。我收到错误消息“TypeError:__init __()需要1个位置参数,但给出了5个”。不再有流。状态选项,但有一个用于stream.on_success。如果我添加app_key,app_secret,oauth_token和oauth_token_secret,我就失去了使用stream.statuses的能力。 – ZacharyST

+0

看起来自上游'twython'图书馆自从2014年1月初以来已经发生变化:https://github.com/ryanmcgrath/twython/blob/master/twython/streaming/api.py#L22 – pztrick

1

因为我想提出一个大多是完整的解决方案。这里是我的版本:

#!/usr/bin/env python 


import sys 
import json 
import datetime 


from twython import TwythonStreamer 


from circuits import Component, Event, Debugger, Timer 


APP_KEY = 'XX' 
APP_SECRET = 'XX' 
OAUTH_TOKEN = 'XX' 
OAUTH_TOKEN_SECRET = 'XX' 


class MyStreamer(TwythonStreamer): 

    def on_success(self, data): 
     print data['text'] 
     with open('scratch1.json', 'ab') as outfile: 
      json.dump(data, outfile, indent=4) 
     with open('scratch2.json', 'ab') as xoutfile: 
      json.dump(data, xoutfile, indent=4) 
     return 

    def on_error(self, status_code, data): 
     print status_code 
     return True # Don't kill the stream 

    def on_timeout(self): 
     print >> sys.stderr, 'Timeout...' 
     return True # Don't kill the stream 


class Check(Event): 
    """Check Event""" 


class Terminate(Event): 
    """Terminate Event""" 


class App(Component): 

    def init(self, *args, **kwargs): 
     self.stream = MyStreamer(
      APP_KEY, APP_SECRET, 
      OAUTH_TOKEN, OAUTH_TOKEN_SECRET 
     ) 

     interval = datetime.datetime.now() + datetime.timedelta(hours=1) 
     Timer(interval, Terminate(), self.channel).register(self) 

     Timer(10, Check(), persist=True, channel=self.channel).register(self) 

    def terminate(self): 
     raise SystemExit(0) 

    def check(self): 
     self.stream.statuses.filter(follow=[95995660, 8820362]) 


app = App() 
Debugger().register(app) 
app.run() 

这添加一个额外的库/框架,以您的解决方案称为circuits和它的强大三项赛和内置Timer组件。

注:我还没有测试过这个。我将留给你,因为我没有特别使用0123tTwtiter客户端库,也没有使用它的任何API。祝你好运!

另外请注意,我假设你不想连续检查流,因为Twitter API可能有某种限制。因此,第二个计时器将每10秒触发一次Check事件并坚持运行应用程序的整个生命周期。

+0

我试过了,它确实有效,但看起来相当复杂,尤其是与下面的@ pztrick解决方案相比。 –

+0

确定我的解决方案有点更通用,但可以与任何事情一起工作!虽然他依靠回调叽叽喳喳图书馆恰好有! –

0

我建议使用datetime.datetime.now()模块
datetime.datetime.now()+ datetime.deltatime(秒= 3600)作为您的一小时停机时间。

1

我不能用Apoorv的代码来复制pztrick的修改。写作:

class MyStreamer(TwythonStreamer): 
    def __init__(self): 
     self.stop_time = dt.datetime.now() + dt.timedelta(minutes=1) 

会产生此错误消息:

​​

下不工作:

类MyStreamer(twy.TwythonStreamer):

def __init__(self): 
    self.stop_time = dt.datetime.now() + dt.timedelta(minutes=1) 
    self.app_key = APP_KEY 
    self.app_secret = APP_SECRET 
    self.oauth_token = OAUTH_TOKEN 
    self.oauth_token_secret = OAUTH_TOKEN_SECRET 

做什么工作但是,只需定义stop_time而不需要init。我最终的解决方案是这样的:

class MyStreamer(twy.TwythonStreamer): 

    stop_time = dt.datetime.now() + dt.timedelta(minutes=1) 

    def on_success(self, data): 
     if dt.datetime.now() > self.stop_time: 
      raise Exception('Time expired') 

     fileName = self.fileDirectory + 'Tweets_' + dt.datetime.now().strftime("%Y_%m_%d_%H") + '.txt' # File name includes date out to hour. 
     open(fileName, 'a').write(json.dumps(data) + '\n') 

我新的类,所以不明白为什么这样的作品,但我很高兴,它的作用。

相关问题