2014-01-27 26 views
2

我有一组关键字(超过600),我想使用流API来跟踪推文与他们。 Twitter api将您可以跟踪的关键字数量限制为200个。因此,我决定使用多个线程来完成此操作,并为此使用多个OAuth令牌。这是我该怎么做的:使用多线程从twitter使用twitter4j获取数据

String[] dbKeywords = KeywordImpl.listKeywords(); 
    List<String[]> keywords = ditributeKeywords(dbKeywords); 
    for (String[] subList : keywords) { 
     StreamCrawler streamCrawler = new StreamCrawler(); 
     streamCrawler.setKeywords(subList); 
     Thread crawlerThread = new Thread(streamCrawler); 
     crawlerThread.start(); 
    } 

这是如何在线程间分配字。每个线程收到不超过200个单词。 这是StreamCrawler执行:

public class StreamCrawler extends Crawler implements Runnable { 

... 

    private String[] keywords; 
    public void setKeywords(String[] keywords) { 
    this.keywords = keywords; 
} 

@Override 
public void run() { 
    TwitterStream twitterStream = getTwitterInstance(); 
    StatusListener listener = new StatusListener() { 
     ArrayDeque<Tweet> tweetbuffer = new ArrayDeque<Tweet>(); 
     ArrayDeque<TwitterUser> userbuffer = new ArrayDeque<TwitterUser>(); 


     @Override 
     public void onException(Exception arg0) { 
      System.out.println(arg0); 
     } 

     @Override 
     public void onDeletionNotice(StatusDeletionNotice arg0) { 
      System.out.println(arg0); 
     } 

     @Override 
     public void onScrubGeo(long arg0, long arg1) { 
      System.out.println(arg1); 
     } 

     @Override 
     public void onStatus(Status status) { 
       ...Doing something with message 
     } 

     @Override 
     public void onTrackLimitationNotice(int arg0) { 
      System.out.println(arg0); 
      try { 
       Thread.sleep(5 * 60 * 1000); 
       System.out.println("Will sleep for 5 minutes!"); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 

     @Override 
     public void onStallWarning(StallWarning arg0) { 
      System.out.println(arg0); 
     } 

    }; 

    FilterQuery fq = new FilterQuery(); 
    String keywords[] = getKeywords(); 
    System.out.println(keywords.length); 
    System.out.println("Listening for " + Arrays.toString(keywords)); 
    fq.track(keywords); 
    twitterStream.addListener(listener); 
    twitterStream.filter(fq); 
} 

private long getCurrentThreadId() { 
    return Thread.currentThread().getId(); 
} 

private TwitterStream getTwitterInstance() { 
    TwitterConfiguration configuration = null; 
    TwitterStream twitterStream = null; 
    while (configuration == null) { 
     configuration = TokenFactory.getAvailableToken(); 
     if (configuration != null) { 
      System.out 
        .println("Token was obtained " + getCurrentThreadId()); 
      System.out.println(configuration.getTwitterAccount()); 
      setToken(configuration); 
      ConfigurationBuilder cb = new ConfigurationBuilder(); 
      cb.setDebugEnabled(true); 
      cb.setOAuthConsumerKey(configuration.getConsumerKey()); 
      cb.setOAuthConsumerSecret(configuration.getConsumerSecret()); 
      cb.setOAuthAccessToken(configuration.getAccessToken()); 
      cb.setOAuthAccessTokenSecret(configuration.getAccessSecret()); 
      twitterStream = new TwitterStreamFactory(cb.build()) 
        .getInstance(); 
     } else { 
      // If there is no available configuration, wait for 2 minutes 
      // and try again 
      try { 
       System.out 
         .println("There were no available tokens, sleeping for 2 minutes."); 
       Thread.sleep(2 * 60 * 1000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 
    } 
    return twitterStream; 
    } 
} 

所以我的问题是,当我开始为例如2个线程我得到他们两人都开流,得到它的通知。但实际上只有第一个真正获得流并分别调用OnStatus方法。在第二个线程中使用的数组不是空的; Twitter配置也是有效和唯一的。所以我不明白什么可能是这种行为的原因。为什么唯一的第一个线程返回tweets?

+0

这很奇怪,因为一切似乎都是独一无二的。为什么不运行多个应用程序而不是在同一应用程序中运行线程流? – mgokhanbakal

+0

我没有想到这种方法,因为关键字的数量会增加,所以我将不得不运行超过5个实例。此外,我会说这不是明智的,因为我看不出有任何障碍可以通过不同的线索来完成。 – Tonven

+0

我同意它应该以某种方式工作。我只是提出了一个想法,如果你的键盘很小。 :D – mgokhanbakal

回答

9

据我所见,你正尝试从同一个IP同时连接两个公共流媒体端点(又名a.a. general streams或stream.twitter.com)。
更具体地说,我想你想要两个来自同一IP的stream.twitter.com/1.1/statuses/filter.json活动连接。

虽然Twitter的数据流的API文档中并没有明确说的只有一个站立连接到公共端点,Twitter的员工澄清这一点在开发站点https://dev.twitter.com/discussions/7542

对于一般流,你只应来自同一个IP的一个连接。

这意味着使用两个不同的Twitter应用程序/帐户连接到公共流并不重要;只要你从同一个IP地址连接,你就只能有一个常规连接到公共流。你说你有两个流连接,并且回答了这个行为是由一个Twitter员工给出:https://dev.twitter.com/discussions/14935

您可能会发现,有时stream.twitter.com让你与更多的开放连接脱身这里或那里,但不应该指望这种行为。

如果你尝试,例如,在第2个线程,连接到用户的流而不是(twitter4j TwitterStream用户()方法),那么你就会真正开始得到两个过滤&用户流。

关于200 track关键字限制,twitter4j.org javadoc可能有点过时。以下是twitter api文档所说的内容

默认访问级别允许多达400个轨道关键字,5,000个关注用户标识和25个0.1-360度位置框。如果您需要提升访问流API,你应该那么探索我们的Twitter数据的合作伙伴供应商...

,如果你需要超越400,你可能要问Twitter的上升轨道您的Twitter帐户应用程序的访问级别,或者与认证的Twitter数据合作伙伴提供商合作。因为twitter4j过滤器(或用户)“方法内部创建了一个操作TwitterStream并且连续调用适当的监听器方法的线程”(引用自Yusuke Yamamoto的示例代码)。

我希望这个帮助。 (我不能发布更多的链接,因为我得到这个“你需要至少10个声望发布超过2个链接”)

+0

谢谢!似乎是这种行为的原因。此外,有几个流线程运行并不是很糟糕,所以一般不应该被忽视,但有时候可以是:) – Tonven

相关问题